Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24937_nats_source_headers.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `nats` source can now expose NATS message headers on each event. Set the new `headers_key` option to the field where headers should be written (for example `headers_key: headers`). Each header name maps to an array of its string values, since NATS headers can be multi-valued. By default `headers_key` is unset and no headers are exposed, preserving backwards compatibility.

authors: Simon Dugas
101 changes: 99 additions & 2 deletions src/sources/nats/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vector_lib::{
configurable::configurable_component,
lookup::{lookup_v2::OptionalValuePath, owned_value_path},
};
use vrl::value::Kind;
use vrl::value::{Kind, kind::Collection};

use crate::{
codecs::DecodingConfig,
Expand Down Expand Up @@ -156,6 +156,15 @@ pub struct NatsSourceConfig {
#[serde(default = "default_subject_key_field")]
pub subject_key_field: OptionalValuePath,

/// Enables exposing NATS message headers on each event under the named key.
///
/// By default this is unset and no headers are exposed, preserving backwards
/// compatibility. When set, the message headers are inserted as an object
/// mapping each header name to an array of its string values.
#[serde(default)]
#[configurable(metadata(docs::examples = "headers"))]
pub headers_key: OptionalValuePath,

/// The buffer capacity of the underlying NATS subscriber.
///
/// This value determines how many messages the NATS subscriber buffers
Expand Down Expand Up @@ -258,7 +267,7 @@ impl SourceConfig for NatsSourceConfig {
.clone()
.path
.map(LegacyKey::InsertIfEmpty);
let schema_definition = self
let mut schema_definition = self
.decoding
.schema_definition(log_namespace)
.with_standard_vector_source_metadata()
Expand All @@ -270,6 +279,19 @@ impl SourceConfig for NatsSourceConfig {
None,
);

if let Some(headers_key) = self.headers_key.path.clone() {
schema_definition = schema_definition.with_source_metadata(
NatsSourceConfig::NAME,
Some(LegacyKey::Overwrite(headers_key)),
&owned_value_path!("headers"),
Kind::object(
Collection::empty()
.with_unknown(Kind::array(Collection::empty().with_unknown(Kind::bytes()))),
),
None,
);
}

vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
Expand Down Expand Up @@ -373,6 +395,81 @@ mod tests {
assert_eq!(definitions, Some(expected_definition));
}

#[test]
fn output_schema_definition_vector_namespace_with_headers() {
let config = NatsSourceConfig {
log_namespace: Some(true),
subject_key_field: default_subject_key_field(),
headers_key: OptionalValuePath::new("headers"),
..Default::default()
};

let definitions = config
.outputs(LogNamespace::Vector)
.remove(0)
.schema_definition(true);

let expected_definition =
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), "message")
.with_metadata_field(
&owned_value_path!("vector", "source_type"),
Kind::bytes(),
None,
)
.with_metadata_field(
&owned_value_path!("vector", "ingest_timestamp"),
Kind::timestamp(),
None,
)
.with_metadata_field(&owned_value_path!("nats", "subject"), Kind::bytes(), None)
.with_metadata_field(
&owned_value_path!("nats", "headers"),
Kind::object(Collection::empty().with_unknown(Kind::array(
Collection::empty().with_unknown(Kind::bytes()),
))),
None,
);

assert_eq!(definitions, Some(expected_definition));
}

#[test]
fn output_schema_definition_legacy_namespace_with_headers() {
let config = NatsSourceConfig {
subject_key_field: default_subject_key_field(),
headers_key: OptionalValuePath::new("headers"),
..Default::default()
};
let definitions = config
.outputs(LogNamespace::Legacy)
.remove(0)
.schema_definition(true);

let expected_definition = Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[LogNamespace::Legacy],
)
.with_event_field(
&owned_value_path!("message"),
Kind::bytes(),
Some("message"),
)
.with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
.with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
.with_event_field(&owned_value_path!("subject"), Kind::bytes(), None)
.with_event_field(
&owned_value_path!("headers"),
Kind::object(
Collection::empty()
.with_unknown(Kind::array(Collection::empty().with_unknown(Kind::bytes()))),
),
None,
);

assert_eq!(definitions, Some(expected_definition));
}

#[test]
fn output_schema_definition_legacy_namespace() {
let config = NatsSourceConfig {
Expand Down
92 changes: 92 additions & 0 deletions src/sources/nats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,98 @@ async fn publish_and_check(conf: NatsSourceConfig) -> Result<(), BuildError> {
Ok(())
}

/// Publishes a message with NATS headers and returns the resulting event's log.
async fn publish_with_headers_and_collect(
conf: NatsSourceConfig,
headers: async_nats::HeaderMap,
) -> Result<vector_lib::event::LogEvent, BuildError> {
let subject = conf.subject.clone();
let (nc, sub) = create_subscription(&conf).await?;
let nc_pub = nc.clone();
let msg = "my message with headers";

let events = assert_source_compliance(&SOURCE_TAGS, async move {
let (tx, rx) = SourceSender::new_test();
let decoder = DecodingConfig::new(
conf.framing.clone(),
conf.decoding.clone(),
LogNamespace::Legacy,
)
.build()
.unwrap();
tokio::spawn(run_nats_core(
conf.clone(),
nc,
sub,
decoder,
LogNamespace::Legacy,
ShutdownSignal::noop(),
tx,
));
nc_pub
.publish_with_headers(subject, headers, Bytes::from_static(msg.as_bytes()))
.await
.unwrap();

collect_n(rx, 1).await
})
.await;

Ok(events.into_iter().next().unwrap().into_log())
}

#[tokio::test]
async fn nats_headers_enabled() {
let subject = format!("test-{}", random_string(10));
let url =
std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222"));

let mut conf = generate_source_config(&url, &subject);
conf.headers_key = vector_lib::lookup::lookup_v2::OptionalValuePath::new("headers");

let mut headers = async_nats::HeaderMap::new();
headers.insert("X-My-Custom-Header", "A");
headers.insert("X-My-Custom-Timestamp", "1771517040123456000");

let log = publish_with_headers_and_collect(conf, headers)
.await
.expect("publish_with_headers_and_collect failed");

let mut expected = vrl::value::ObjectMap::new();
expected.insert(
"X-My-Custom-Header".into(),
vec![vector_lib::event::Value::from("A")].into(),
);
expected.insert(
"X-My-Custom-Timestamp".into(),
vec![vector_lib::event::Value::from("1771517040123456000")].into(),
);

assert_eq!(log["headers"], vector_lib::event::Value::Object(expected));
}

#[tokio::test]
async fn nats_headers_disabled_by_default() {
let subject = format!("test-{}", random_string(10));
let url =
std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222"));

// No headers_key set: headers must not be exposed.
let conf = generate_source_config(&url, &subject);

let mut headers = async_nats::HeaderMap::new();
headers.insert("X-My-Custom-Header", "A");

let log = publish_with_headers_and_collect(conf, headers)
.await
.expect("publish_with_headers_and_collect failed");

assert!(
log.get("headers").is_none(),
"headers should not be present when headers_key is unset, got: {log:?}"
);
}

#[tokio::test]
async fn nats_no_auth() {
let subject = format!("test-{}", random_string(10));
Expand Down
82 changes: 82 additions & 0 deletions src/sources/nats/source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_nats::jetstream::consumer::pull::Stream as PullConsumerStream;
use bytes::Bytes;
use chrono::Utc;
use futures::StreamExt;
use snafu::ResultExt;
Expand All @@ -12,6 +13,7 @@ use vector_lib::{
},
lookup::owned_value_path,
};
use vrl::value::{ObjectMap, Value};

use crate::{
SourceSender,
Expand All @@ -22,6 +24,22 @@ use crate::{
sources::nats::config::{BuildError, NatsSourceConfig, SubscribeSnafu},
};

/// Converts a NATS [`HeaderMap`] into a [`Value`] suitable for insertion into an event.
///
/// Each header name maps to an array of its string values, since NATS headers
/// can be multi-valued.
fn headers_to_value(headers: &async_nats::HeaderMap) -> Value {
let mut map = ObjectMap::new();
for (name, values) in headers.iter() {
let values = values
.iter()
.map(|value| Value::from(Bytes::copy_from_slice(value.as_str().as_bytes())))
.collect::<Vec<_>>();
map.insert(name.to_string().into(), Value::Array(values));
}
Value::Object(map)
}

/// The outcome of processing a single NATS message.
pub enum ProcessingStatus {
/// The message payload was fully decoded and sent downstream.
Expand Down Expand Up @@ -76,6 +94,18 @@ pub async fn process_message(
&owned_value_path!("subject"),
msg.subject.as_str(),
);

if let Some(headers_key) = config.headers_key.path.as_ref()
&& let Some(headers) = msg.headers.as_ref()
Comment on lines +98 to +99

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep headers field present when enabled

When headers_key is configured but an individual NATS message has no headers, this combined condition skips insertion entirely. outputs() still declares the configured headers field as an object, and the changelog/docs say headers are written on each event, so mixed traffic can produce events where downstream VRL is type-checked as if .headers exists but receives no field at runtime; insert an empty object in this case or mark the schema optional.

Useful? React with 👍 / 👎.

{
log_namespace.insert_source_metadata(
NatsSourceConfig::NAME,
log,
Some(LegacyKey::Overwrite(headers_key)),
&owned_value_path!("headers"),
headers_to_value(headers),
);
}
}
event
});
Expand Down Expand Up @@ -220,3 +250,55 @@ pub async fn create_subscription(

Ok((nc, subscription))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn headers_to_value_maps_names_to_arrays_of_values() {
let mut headers = async_nats::HeaderMap::new();
headers.insert("X-My-Custom-Header", "A");
headers.insert("X-My-Custom-Timestamp", "1771517040123456000");

let value = headers_to_value(&headers);

let mut expected = ObjectMap::new();
expected.insert(
"X-My-Custom-Header".into(),
Value::Array(vec![Value::from("A")]),
);
expected.insert(
"X-My-Custom-Timestamp".into(),
Value::Array(vec![Value::from("1771517040123456000")]),
);

assert_eq!(value, Value::Object(expected));
}

#[test]
fn headers_to_value_collects_multiple_values_per_name() {
let mut headers = async_nats::HeaderMap::new();
headers.append("X-Multi", "one");
headers.append("X-Multi", "two");

let value = headers_to_value(&headers);

let object = value.as_object().expect("headers should be an object");
let values = object
.get("X-Multi")
.and_then(|v| v.as_array())
.expect("X-Multi should be an array");

assert_eq!(values.len(), 2);
assert!(values.contains(&Value::from("one")));
assert!(values.contains(&Value::from("two")));
}

#[test]
fn headers_to_value_empty_map_is_empty_object() {
let headers = async_nats::HeaderMap::new();
let value = headers_to_value(&headers);
assert_eq!(value, Value::Object(ObjectMap::new()));
}
}
14 changes: 14 additions & 0 deletions website/cue/reference/components/sources/generated/nats.cue
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,20 @@ generated: components: sources: nats: configuration: {
}
}
}
headers_key: {
description: """
Enables exposing NATS message headers on each event under the named key.

By default this is unset and no headers are exposed, preserving backwards
compatibility. When set, the message headers are inserted as an object
mapping each header name to an array of its string values.
"""
required: false
type: string: {
default: ""
examples: ["headers"]
}
}
jetstream: {
description: "Configuration for NATS JetStream."
required: false
Expand Down
Loading