diff --git a/changelog.d/24937_nats_source_headers.feature.md b/changelog.d/24937_nats_source_headers.feature.md new file mode 100644 index 0000000000000..c77e112a02954 --- /dev/null +++ b/changelog.d/24937_nats_source_headers.feature.md @@ -0,0 +1,3 @@ +The `nats` source can now expose NATS message headers on each event. Set the new `headers_key` option to the field where headers should be written (for example `headers_key: headers`). Each header name maps to an array of its string values, since NATS headers can be multi-valued. By default `headers_key` is unset and no headers are exposed, preserving backwards compatibility. + +authors: Simon Dugas diff --git a/src/sources/nats/config.rs b/src/sources/nats/config.rs index b4fce37c38a7e..84cdb4542f138 100644 --- a/src/sources/nats/config.rs +++ b/src/sources/nats/config.rs @@ -9,7 +9,7 @@ use vector_lib::{ configurable::configurable_component, lookup::{lookup_v2::OptionalValuePath, owned_value_path}, }; -use vrl::value::Kind; +use vrl::value::{Kind, kind::Collection}; use crate::{ codecs::DecodingConfig, @@ -156,6 +156,15 @@ pub struct NatsSourceConfig { #[serde(default = "default_subject_key_field")] pub subject_key_field: OptionalValuePath, + /// Enables exposing NATS message headers on each event under the named key. + /// + /// By default this is unset and no headers are exposed, preserving backwards + /// compatibility. When set, the message headers are inserted as an object + /// mapping each header name to an array of its string values. + #[serde(default)] + #[configurable(metadata(docs::examples = "headers"))] + pub headers_key: OptionalValuePath, + /// The buffer capacity of the underlying NATS subscriber. /// /// This value determines how many messages the NATS subscriber buffers @@ -258,7 +267,7 @@ impl SourceConfig for NatsSourceConfig { .clone() .path .map(LegacyKey::InsertIfEmpty); - let schema_definition = self + let mut schema_definition = self .decoding .schema_definition(log_namespace) .with_standard_vector_source_metadata() @@ -270,6 +279,19 @@ impl SourceConfig for NatsSourceConfig { None, ); + if let Some(headers_key) = self.headers_key.path.clone() { + schema_definition = schema_definition.with_source_metadata( + NatsSourceConfig::NAME, + Some(LegacyKey::Overwrite(headers_key)), + &owned_value_path!("headers"), + Kind::object( + Collection::empty() + .with_unknown(Kind::array(Collection::empty().with_unknown(Kind::bytes()))), + ), + None, + ); + } + vec![SourceOutput::new_maybe_logs( self.decoding.output_type(), schema_definition, @@ -373,6 +395,81 @@ mod tests { assert_eq!(definitions, Some(expected_definition)); } + #[test] + fn output_schema_definition_vector_namespace_with_headers() { + let config = NatsSourceConfig { + log_namespace: Some(true), + subject_key_field: default_subject_key_field(), + headers_key: OptionalValuePath::new("headers"), + ..Default::default() + }; + + let definitions = config + .outputs(LogNamespace::Vector) + .remove(0) + .schema_definition(true); + + let expected_definition = + Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]) + .with_meaning(OwnedTargetPath::event_root(), "message") + .with_metadata_field( + &owned_value_path!("vector", "source_type"), + Kind::bytes(), + None, + ) + .with_metadata_field( + &owned_value_path!("vector", "ingest_timestamp"), + Kind::timestamp(), + None, + ) + .with_metadata_field(&owned_value_path!("nats", "subject"), Kind::bytes(), None) + .with_metadata_field( + &owned_value_path!("nats", "headers"), + Kind::object(Collection::empty().with_unknown(Kind::array( + Collection::empty().with_unknown(Kind::bytes()), + ))), + None, + ); + + assert_eq!(definitions, Some(expected_definition)); + } + + #[test] + fn output_schema_definition_legacy_namespace_with_headers() { + let config = NatsSourceConfig { + subject_key_field: default_subject_key_field(), + headers_key: OptionalValuePath::new("headers"), + ..Default::default() + }; + let definitions = config + .outputs(LogNamespace::Legacy) + .remove(0) + .schema_definition(true); + + let expected_definition = Definition::new_with_default_metadata( + Kind::object(Collection::empty()), + [LogNamespace::Legacy], + ) + .with_event_field( + &owned_value_path!("message"), + Kind::bytes(), + Some("message"), + ) + .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None) + .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None) + .with_event_field(&owned_value_path!("subject"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("headers"), + Kind::object( + Collection::empty() + .with_unknown(Kind::array(Collection::empty().with_unknown(Kind::bytes()))), + ), + None, + ); + + assert_eq!(definitions, Some(expected_definition)); + } + #[test] fn output_schema_definition_legacy_namespace() { let config = NatsSourceConfig { diff --git a/src/sources/nats/integration_tests.rs b/src/sources/nats/integration_tests.rs index a3caa7e766c07..7e0fd834c887c 100644 --- a/src/sources/nats/integration_tests.rs +++ b/src/sources/nats/integration_tests.rs @@ -131,6 +131,98 @@ async fn publish_and_check(conf: NatsSourceConfig) -> Result<(), BuildError> { Ok(()) } +/// Publishes a message with NATS headers and returns the resulting event's log. +async fn publish_with_headers_and_collect( + conf: NatsSourceConfig, + headers: async_nats::HeaderMap, +) -> Result { + let subject = conf.subject.clone(); + let (nc, sub) = create_subscription(&conf).await?; + let nc_pub = nc.clone(); + let msg = "my message with headers"; + + let events = assert_source_compliance(&SOURCE_TAGS, async move { + let (tx, rx) = SourceSender::new_test(); + let decoder = DecodingConfig::new( + conf.framing.clone(), + conf.decoding.clone(), + LogNamespace::Legacy, + ) + .build() + .unwrap(); + tokio::spawn(run_nats_core( + conf.clone(), + nc, + sub, + decoder, + LogNamespace::Legacy, + ShutdownSignal::noop(), + tx, + )); + nc_pub + .publish_with_headers(subject, headers, Bytes::from_static(msg.as_bytes())) + .await + .unwrap(); + + collect_n(rx, 1).await + }) + .await; + + Ok(events.into_iter().next().unwrap().into_log()) +} + +#[tokio::test] +async fn nats_headers_enabled() { + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let mut conf = generate_source_config(&url, &subject); + conf.headers_key = vector_lib::lookup::lookup_v2::OptionalValuePath::new("headers"); + + let mut headers = async_nats::HeaderMap::new(); + headers.insert("X-My-Custom-Header", "A"); + headers.insert("X-My-Custom-Timestamp", "1771517040123456000"); + + let log = publish_with_headers_and_collect(conf, headers) + .await + .expect("publish_with_headers_and_collect failed"); + + let mut expected = vrl::value::ObjectMap::new(); + expected.insert( + "X-My-Custom-Header".into(), + vec![vector_lib::event::Value::from("A")].into(), + ); + expected.insert( + "X-My-Custom-Timestamp".into(), + vec![vector_lib::event::Value::from("1771517040123456000")].into(), + ); + + assert_eq!(log["headers"], vector_lib::event::Value::Object(expected)); +} + +#[tokio::test] +async fn nats_headers_disabled_by_default() { + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + // No headers_key set: headers must not be exposed. + let conf = generate_source_config(&url, &subject); + + let mut headers = async_nats::HeaderMap::new(); + headers.insert("X-My-Custom-Header", "A"); + + let log = publish_with_headers_and_collect(conf, headers) + .await + .expect("publish_with_headers_and_collect failed"); + + assert!( + log.get("headers").is_none(), + "headers should not be present when headers_key is unset, got: {log:?}" + ); +} + #[tokio::test] async fn nats_no_auth() { let subject = format!("test-{}", random_string(10)); diff --git a/src/sources/nats/source.rs b/src/sources/nats/source.rs index 19352e7beec21..0ee2de3161a47 100644 --- a/src/sources/nats/source.rs +++ b/src/sources/nats/source.rs @@ -1,4 +1,5 @@ use async_nats::jetstream::consumer::pull::Stream as PullConsumerStream; +use bytes::Bytes; use chrono::Utc; use futures::StreamExt; use snafu::ResultExt; @@ -12,6 +13,7 @@ use vector_lib::{ }, lookup::owned_value_path, }; +use vrl::value::{ObjectMap, Value}; use crate::{ SourceSender, @@ -22,6 +24,22 @@ use crate::{ sources::nats::config::{BuildError, NatsSourceConfig, SubscribeSnafu}, }; +/// Converts a NATS [`HeaderMap`] into a [`Value`] suitable for insertion into an event. +/// +/// Each header name maps to an array of its string values, since NATS headers +/// can be multi-valued. +fn headers_to_value(headers: &async_nats::HeaderMap) -> Value { + let mut map = ObjectMap::new(); + for (name, values) in headers.iter() { + let values = values + .iter() + .map(|value| Value::from(Bytes::copy_from_slice(value.as_str().as_bytes()))) + .collect::>(); + map.insert(name.to_string().into(), Value::Array(values)); + } + Value::Object(map) +} + /// The outcome of processing a single NATS message. pub enum ProcessingStatus { /// The message payload was fully decoded and sent downstream. @@ -76,6 +94,18 @@ pub async fn process_message( &owned_value_path!("subject"), msg.subject.as_str(), ); + + if let Some(headers_key) = config.headers_key.path.as_ref() + && let Some(headers) = msg.headers.as_ref() + { + log_namespace.insert_source_metadata( + NatsSourceConfig::NAME, + log, + Some(LegacyKey::Overwrite(headers_key)), + &owned_value_path!("headers"), + headers_to_value(headers), + ); + } } event }); @@ -220,3 +250,55 @@ pub async fn create_subscription( Ok((nc, subscription)) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn headers_to_value_maps_names_to_arrays_of_values() { + let mut headers = async_nats::HeaderMap::new(); + headers.insert("X-My-Custom-Header", "A"); + headers.insert("X-My-Custom-Timestamp", "1771517040123456000"); + + let value = headers_to_value(&headers); + + let mut expected = ObjectMap::new(); + expected.insert( + "X-My-Custom-Header".into(), + Value::Array(vec![Value::from("A")]), + ); + expected.insert( + "X-My-Custom-Timestamp".into(), + Value::Array(vec![Value::from("1771517040123456000")]), + ); + + assert_eq!(value, Value::Object(expected)); + } + + #[test] + fn headers_to_value_collects_multiple_values_per_name() { + let mut headers = async_nats::HeaderMap::new(); + headers.append("X-Multi", "one"); + headers.append("X-Multi", "two"); + + let value = headers_to_value(&headers); + + let object = value.as_object().expect("headers should be an object"); + let values = object + .get("X-Multi") + .and_then(|v| v.as_array()) + .expect("X-Multi should be an array"); + + assert_eq!(values.len(), 2); + assert!(values.contains(&Value::from("one"))); + assert!(values.contains(&Value::from("two"))); + } + + #[test] + fn headers_to_value_empty_map_is_empty_object() { + let headers = async_nats::HeaderMap::new(); + let value = headers_to_value(&headers); + assert_eq!(value, Value::Object(ObjectMap::new())); + } +} diff --git a/website/cue/reference/components/sources/generated/nats.cue b/website/cue/reference/components/sources/generated/nats.cue index 16a024742ac3a..85361ba19c2e1 100644 --- a/website/cue/reference/components/sources/generated/nats.cue +++ b/website/cue/reference/components/sources/generated/nats.cue @@ -598,6 +598,20 @@ generated: components: sources: nats: configuration: { } } } + headers_key: { + description: """ + Enables exposing NATS message headers on each event under the named key. + + By default this is unset and no headers are exposed, preserving backwards + compatibility. When set, the message headers are inserted as an object + mapping each header name to an array of its string values. + """ + required: false + type: string: { + default: "" + examples: ["headers"] + } + } jetstream: { description: "Configuration for NATS JetStream." required: false