From 1b015879db981529532c472df6a78afd7dd69a6e Mon Sep 17 00:00:00 2001 From: Ronit Anilkumar Date: Mon, 15 Jun 2026 03:42:47 -0700 Subject: [PATCH 1/5] chore: add changelog fragment for LLMObs endpoint --- changelog.d/25441_llmobs_endpoint.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/25441_llmobs_endpoint.feature.md diff --git a/changelog.d/25441_llmobs_endpoint.feature.md b/changelog.d/25441_llmobs_endpoint.feature.md new file mode 100644 index 0000000000000..9fd5526649fe0 --- /dev/null +++ b/changelog.d/25441_llmobs_endpoint.feature.md @@ -0,0 +1,3 @@ +The `datadog_agent` source now accepts LLM Observability (LLMObs) telemetry at `/api/v2/llmobs`. When `multiple_outputs` is enabled, LLMObs span events are available as log events on the `llmobs` output port. + +authors: ronitanilkumar \ No newline at end of file From 2d0bd166d797f29c1658c6463fcc0ec9cf013cf5 Mon Sep 17 00:00:00 2001 From: Ronit Anilkumar Date: Mon, 15 Jun 2026 13:28:15 -0700 Subject: [PATCH 2/5] feat(datadog_agent source): accept LLMObs telemetry at /api/v2/llmobs --- src/sources/datadog_agent/llmobs.rs | 157 ++++++++++++++++++ src/sources/datadog_agent/mod.rs | 21 ++- src/sources/datadog_agent/tests.rs | 81 ++++++++- .../components/sources/datadog_agent.cue | 7 + 4 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 src/sources/datadog_agent/llmobs.rs diff --git a/src/sources/datadog_agent/llmobs.rs b/src/sources/datadog_agent/llmobs.rs new file mode 100644 index 0000000000000..b45fec55c039e --- /dev/null +++ b/src/sources/datadog_agent/llmobs.rs @@ -0,0 +1,157 @@ +use bytes::Bytes; +use http::StatusCode; +use serde::Deserialize; +use serde_json::Value; +use std::sync::Arc; +use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response}; + +use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler}; +use crate::{ + common::http::ErrorMessage, + event::{Event, LogEvent}, + internal_events::DatadogAgentJsonParseError, +}; + +pub(super) fn build_warp_filter( + handler: RequestHandler, + source: DatadogAgentSource, +) -> BoxedFilter<(Response,)> { + warp::post() + .and(path!("api" / "v2" / "llmobs" / ..)) + .and(warp::path::full()) + .and(warp::header::optional::("content-encoding")) + .and(warp::header::optional::("dd-api-key")) + .and(warp::query::()) + .and(warp::body::bytes()) + .and_then( + move |path: FullPath, + encoding_header: Option, + api_token: Option, + query_params: ApiKeyQueryParams, + body: Bytes| { + let events = source + .decode(&encoding_header, body, path.as_str()) + .and_then(|body| { + decode_llmobs_body( + body, + source.api_key_extractor.extract( + path.as_str(), + api_token, + query_params.dd_api_key, + ), + ) + }); + handler.clone().handle_request(events, super::LLMOBS) + }, + ) + .boxed() +} + +#[derive(Deserialize)] +struct LLMObsEnvelopeItem { + #[serde(rename = "event_type")] + _event_type: Option, + spans: Vec, + #[serde(rename = "_dd.tracer_version")] + dd_tracer_version: Option, + #[serde(rename = "_dd.scope")] + _dd_scope: Option, +} + +#[derive(Deserialize)] +struct LLMObsSpan { + span_id: String, + trace_id: String, + parent_id: Option, + name: Option, + session_id: Option, + service: Option, + start_ns: Option, + duration: Option, + status: Option, + status_message: Option, + meta: Option, + metrics: Option, + #[serde(default)] + tags: Vec, + #[serde(rename = "_dd")] + dd: Option, +} + +pub(crate) fn decode_llmobs_body( + body: Bytes, + api_key: Option>, +) -> Result, ErrorMessage> { + let envelope: Vec = serde_json::from_slice(&body).map_err(|error| { + emit!(DatadogAgentJsonParseError { error: &error }); + ErrorMessage::new( + StatusCode::BAD_REQUEST, + format!("Error parsing JSON: {error:?}"), + ) + })?; + + let events = envelope + .into_iter() + .flat_map(|item| { + let tracer_version = item.dd_tracer_version.clone(); + item.spans.into_iter().map(move |span| { + let mut log = LogEvent::default(); + log.insert("span_id", span.span_id); + log.insert("trace_id", span.trace_id); + if let Some(v) = span.parent_id { + log.insert("parent_id", v); + } + if let Some(v) = span.name { + log.insert("name", v); + } + if let Some(v) = span.session_id { + log.insert("session_id", v); + } + if let Some(v) = span.service { + log.insert("service", v); + } + if let Some(v) = span.start_ns { + log.insert("start_ns", v); + } + if let Some(v) = span.duration { + log.insert("duration", v); + } + if let Some(v) = span.status { + log.insert("status", v); + } + if let Some(v) = span.status_message { + log.insert("status_message", v); + } + if let Some(v) = span.meta { + log.insert("meta", v); + } + if let Some(v) = span.metrics { + log.insert("metrics", v); + } + if !span.tags.is_empty() { + log.insert("tags", span.tags); + } + if let Some(ml_app) = span + .dd + .as_ref() + .and_then(|dd| dd.get("ml_app")) + .and_then(|v| v.as_str()) + { + log.insert("ml_app", ml_app.to_owned()); + } + if let Some(v) = tracer_version.clone() { + log.insert("_dd.tracer_version", v); + } + Event::Log(log) + }) + }) + .map(|mut event| { + if let Some(k) = &api_key { + event.metadata_mut().set_datadog_api_key(Arc::clone(k)); + } + event + }) + .collect(); + + Ok(events) +} diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index ac84188df7c78..0a6e0121b7e5c 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -6,6 +6,7 @@ mod tests; pub mod logs; pub mod metrics; pub mod traces; +pub mod llmobs; #[allow(warnings, clippy::pedantic, clippy::nursery)] pub(crate) mod ddmetric_proto { @@ -69,6 +70,7 @@ use crate::{ pub const LOGS: &str = "logs"; pub const METRICS: &str = "metrics"; pub const TRACES: &str = "traces"; +pub const LLMOBS: &str = "llmobs"; /// Configuration for the `datadog_agent` source. #[configurable_component(source( @@ -106,6 +108,11 @@ pub struct DatadogAgentConfig { #[serde(default = "crate::serde::default_false")] disable_traces: bool, + /// If this is set to `true`, LLM Observability events are not accepted by the component. + #[configurable(metadata(docs::advanced))] + #[serde(default = "crate::serde::default_false")] + disable_llmobs: bool, + /// If this is set to `true`, logs, metrics (beta), and traces (alpha) are sent to different outputs. /// /// @@ -179,6 +186,7 @@ impl GenerateConfig for DatadogAgentConfig { disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, multiple_outputs: false, parse_ddtags: false, split_metric_namespace: true, @@ -322,6 +330,7 @@ impl SourceConfig for DatadogAgentConfig { .with_standard_vector_source_metadata(); let mut output = Vec::with_capacity(1); + let llmobs_definition = definition.clone(); if self.multiple_outputs { if !self.disable_logs { @@ -333,6 +342,9 @@ impl SourceConfig for DatadogAgentConfig { if !self.disable_traces { output.push(SourceOutput::new_traces().with_port(TRACES)) } + if !self.disable_llmobs { + output.push(SourceOutput::new_maybe_logs(DataType::Log, llmobs_definition).with_port(LLMOBS)) + } } else { output.push(SourceOutput::new_maybe_logs( DataType::all_bits(), @@ -459,12 +471,19 @@ impl DatadogAgentSource { } if !config.disable_metrics { - let metrics_filter = metrics::build_warp_filter(handler, self.clone()); + let metrics_filter = metrics::build_warp_filter(handler.clone(), self.clone()); filters = filters .map(|f| f.or(metrics_filter.clone()).unify().boxed()) .or(Some(metrics_filter)); } + if !config.disable_llmobs { + let llmobs_filter = llmobs::build_warp_filter(handler.clone(), self.clone()); + filters = filters + .map(|f| f.or(llmobs_filter.clone()).unify().boxed()) + .or(Some(llmobs_filter)); + } + filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into()) } diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index dc7c414722150..ad74ddc6377e5 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -3,6 +3,7 @@ use std::{ iter::FromIterator, net::SocketAddr, str, + sync::Arc, time::Duration, }; @@ -58,6 +59,8 @@ use crate::{ }, }; +use crate::sources::datadog_agent::llmobs::decode_llmobs_body; + const DD_API_KEY: &str = "12345678abcdefgh12345678abcdefgh"; const DD_API_LOGS_V1_PATH: &str = "/v1/input/"; const DD_API_LOGS_V2_PATH: &str = "/api/v2/logs"; @@ -1554,6 +1557,7 @@ fn test_config_outputs_with_disabled_data_types() { disable_logs: bool, disable_metrics: bool, disable_traces: bool, + disable_llmobs: bool, } for TestCase { @@ -1561,48 +1565,56 @@ fn test_config_outputs_with_disabled_data_types() { disable_logs, disable_metrics, disable_traces, + disable_llmobs, } in [ TestCase { multiple_outputs: true, disable_logs: true, disable_metrics: true, disable_traces: true, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: true, disable_metrics: false, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: false, disable_metrics: true, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: false, disable_metrics: false, disable_traces: true, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: true, disable_metrics: true, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: false, disable_logs: true, disable_metrics: true, disable_traces: true, + disable_llmobs: false, }, ] { let config = DatadogAgentConfig { @@ -1616,6 +1628,7 @@ fn test_config_outputs_with_disabled_data_types() { disable_logs, disable_metrics, disable_traces, + disable_llmobs, parse_ddtags: false, split_metric_namespace: true, log_namespace: Some(false), @@ -1629,7 +1642,10 @@ fn test_config_outputs_with_disabled_data_types() { .map(|output| output.ty) .collect(); if multiple_outputs { - assert_eq!(outputs.contains(&DataType::Log), !disable_logs); + assert_eq!( + outputs.contains(&DataType::Log), + !disable_logs || !disable_llmobs + ); assert_eq!(outputs.contains(&DataType::Trace), !disable_traces); assert_eq!(outputs.contains(&DataType::Metric), !disable_metrics); } else { @@ -2060,6 +2076,7 @@ fn test_config_outputs() { disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, parse_ddtags: false, split_metric_namespace: true, log_namespace: Some(false), @@ -2793,6 +2810,7 @@ impl ValidatableComponent for DatadogAgentConfig { disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, parse_ddtags: false, split_metric_namespace: true, log_namespace: Some(false), @@ -2832,3 +2850,64 @@ impl ValidatableComponent for DatadogAgentConfig { } register_validatable_component!(DatadogAgentConfig); + +#[test] +fn test_decode_llmobs_body() { + let body = Bytes::from( + r#"[ + { + "event_type": "span", + "_dd.tracer_version": "2.17.0", + "spans": [{ + "span_id": "abc123", + "trace_id": "xyz789", + "name": "my.workflow", + "start_ns": 1707763310981223236, + "duration": 12345678900, + "status": "ok", + "meta": { "span": { "kind": "llm" }, "model_name": "gpt-4" }, + "metrics": { "input_tokens": 64, "output_tokens": 128 }, + "tags": ["env:prod", "service:myapp"], + "_dd": { "ml_app": "my-llm-app" } + }] + } + ]"#, + ); + + let events = decode_llmobs_body(body, None).unwrap(); + assert_eq!(events.len(), 1); + + let log = events[0].as_log(); + assert_eq!(log["span_id"], "abc123".into()); + assert_eq!(log["trace_id"], "xyz789".into()); + assert_eq!(log["name"], "my.workflow".into()); + assert_eq!(log["status"], "ok".into()); + assert_eq!(log["ml_app"], "my-llm-app".into()); + assert_eq!(log["_dd.tracer_version"], "2.17.0".into()); +} + +#[test] +fn test_decode_llmobs_body_empty_spans() { + let body = Bytes::from(r#"[{"event_type": "span", "spans": []}]"#); + let events = decode_llmobs_body(body, None).unwrap(); + assert_eq!(events.len(), 0); +} + +#[test] +fn test_decode_llmobs_body_invalid_json() { + let body = Bytes::from("not json"); + assert!(decode_llmobs_body(body, None).is_err()); +} + +#[test] +fn test_decode_llmobs_body_api_key() { + let body = Bytes::from(r#"[{"event_type":"span","spans":[{"span_id":"a","trace_id":"b"}]}]"#); + let api_key: Option> = Some(Arc::from("test1234test1234test1234test1234")); + + let events = decode_llmobs_body(body, api_key).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0].metadata().datadog_api_key().map(|k| k.as_ref().to_owned()), + Some("test1234test1234test1234test1234".to_owned()) + ); +} diff --git a/website/cue/reference/components/sources/datadog_agent.cue b/website/cue/reference/components/sources/datadog_agent.cue index 5bb26bb89823c..50efeb6ae4189 100644 --- a/website/cue/reference/components/sources/datadog_agent.cue +++ b/website/cue/reference/components/sources/datadog_agent.cue @@ -82,6 +82,13 @@ components: sources: datadog_agent: { If [multiple_outputs](#multiple_outputs) is enabled, received trace events will go to this output stream. Use `.traces` as an input to downstream transforms and sinks. """ }, + { + name: "llmobs" + description: """ + If [multiple_outputs](#multiple_outputs) is enabled, received LLM Observability events will go to this output stream. Use `.llmobs` as an input to downstream transforms and sinks. + """ + }, + ] output: { From ca3999197a7bb0e1b6611daf3acd4093bfa31e19 Mon Sep 17 00:00:00 2001 From: Ronit Anilkumar Date: Mon, 15 Jun 2026 13:46:52 -0700 Subject: [PATCH 3/5] chore: regenerate datadog_agent component docs --- .../reference/components/sources/generated/datadog_agent.cue | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/website/cue/reference/components/sources/generated/datadog_agent.cue b/website/cue/reference/components/sources/generated/datadog_agent.cue index fa6a519fe1e75..53474af618179 100644 --- a/website/cue/reference/components/sources/generated/datadog_agent.cue +++ b/website/cue/reference/components/sources/generated/datadog_agent.cue @@ -345,6 +345,11 @@ generated: components: sources: datadog_agent: configuration: { } } } + disable_llmobs: { + description: "If this is set to `true`, LLM Observability events are not accepted by the component." + required: false + type: bool: default: false + } disable_logs: { description: "If this is set to `true`, logs are not accepted by the component." required: false From a4f25b682c9ce473d1c0bb60b0f224930dd356fe Mon Sep 17 00:00:00 2001 From: Ronit Anilkumar Date: Tue, 16 Jun 2026 16:35:26 -0700 Subject: [PATCH 4/5] feat(datadog_agent source): add EVP proxy route, source metadata, events_received, start_ns timestamp, ml_app tag fallback, and span optional fields for LLMObs --- src/sources/datadog_agent/llmobs.rs | 115 ++++++++++++++++++++++++---- src/sources/datadog_agent/mod.rs | 44 ++++++++++- src/sources/datadog_agent/tests.rs | 38 +++++++-- 3 files changed, 173 insertions(+), 24 deletions(-) diff --git a/src/sources/datadog_agent/llmobs.rs b/src/sources/datadog_agent/llmobs.rs index b45fec55c039e..fe83d51eef69a 100644 --- a/src/sources/datadog_agent/llmobs.rs +++ b/src/sources/datadog_agent/llmobs.rs @@ -1,13 +1,22 @@ +use std::sync::Arc; + use bytes::Bytes; +use chrono::{TimeZone, Utc}; use http::StatusCode; use serde::Deserialize; use serde_json::Value; -use std::sync::Arc; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + internal_event::{CountByteSize, InternalEventHandle as _}, + json_size::JsonSize, + lookup::event_path, +}; use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response}; -use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler}; +use super::{ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, RequestHandler}; use crate::{ common::http::ErrorMessage, + config::log_schema, event::{Event, LogEvent}, internal_events::DatadogAgentJsonParseError, }; @@ -16,13 +25,45 @@ pub(super) fn build_warp_filter( handler: RequestHandler, source: DatadogAgentSource, ) -> BoxedFilter<(Response,)> { - warp::post() + let direct = warp::post() .and(path!("api" / "v2" / "llmobs" / ..)) .and(warp::path::full()) .and(warp::header::optional::("content-encoding")) .and(warp::header::optional::("dd-api-key")) .and(warp::query::()) .and(warp::body::bytes()) + .and_then({ + let handler = handler.clone(); + let source = source.clone(); + move |path: FullPath, + encoding_header: Option, + api_token: Option, + query_params: ApiKeyQueryParams, + body: Bytes| { + let events = source + .decode(&encoding_header, body, path.as_str()) + .and_then(|body| { + decode_llmobs_body( + body, + source.api_key_extractor.extract( + path.as_str(), + api_token, + query_params.dd_api_key, + ), + &source, + ) + }); + handler.clone().handle_request(events, super::LLMOBS) + } + }); + + let evp_proxy = warp::post() + .and(path!("evp_proxy" / "v2" / "api" / "v2" / "llmobs" / ..)) + .and(warp::path::full()) + .and(warp::header::optional::("content-encoding")) + .and(warp::header::optional::("dd-api-key")) + .and(warp::query::()) + .and(warp::body::bytes()) .and_then( move |path: FullPath, encoding_header: Option, @@ -39,12 +80,14 @@ pub(super) fn build_warp_filter( api_token, query_params.dd_api_key, ), + &source, ) }); handler.clone().handle_request(events, super::LLMOBS) }, - ) - .boxed() + ); + + direct.or(evp_proxy).unify().boxed() } #[derive(Deserialize)] @@ -76,11 +119,16 @@ struct LLMObsSpan { tags: Vec, #[serde(rename = "_dd")] dd: Option, + span_links: Option, + #[serde(rename = "config")] + config: Option, + collection_errors: Option, } pub(crate) fn decode_llmobs_body( body: Bytes, api_key: Option>, + source: &DatadogAgentSource, ) -> Result, ErrorMessage> { let envelope: Vec = serde_json::from_slice(&body).map_err(|error| { emit!(DatadogAgentJsonParseError { error: &error }); @@ -90,7 +138,10 @@ pub(crate) fn decode_llmobs_body( ) })?; - let events = envelope + let now = Utc::now(); + let mut event_bytes_received = JsonSize::zero(); + + let events: Vec = envelope .into_iter() .flat_map(|item| { let tracer_version = item.dd_tracer_version.clone(); @@ -110,8 +161,11 @@ pub(crate) fn decode_llmobs_body( if let Some(v) = span.service { log.insert("service", v); } - if let Some(v) = span.start_ns { - log.insert("start_ns", v); + if let Some(ns) = span.start_ns { + log.insert("start_ns", ns); + if let Some(ts_path) = log_schema().timestamp_key_target_path() { + log.insert(ts_path, Utc.timestamp_nanos(ns)); + } } if let Some(v) = span.duration { log.insert("duration", v); @@ -129,29 +183,60 @@ pub(crate) fn decode_llmobs_body( log.insert("metrics", v); } if !span.tags.is_empty() { - log.insert("tags", span.tags); + log.insert("tags", span.tags.clone()); + } + if let Some(v) = span.span_links { + log.insert("span_links", v); + } + if let Some(v) = span.config { + log.insert("config", v); } - if let Some(ml_app) = span + if let Some(v) = span.collection_errors { + log.insert("collection_errors", v); + } + + // Extract ml_app: first check span._dd.ml_app, then fall back to tags array. + let ml_app = span .dd .as_ref() .and_then(|dd| dd.get("ml_app")) .and_then(|v| v.as_str()) - { - log.insert("ml_app", ml_app.to_owned()); + .map(str::to_owned) + .or_else(|| { + span.tags + .iter() + .find_map(|tag| tag.strip_prefix("ml_app:").map(str::to_owned)) + }); + if let Some(app) = ml_app { + log.insert("ml_app", app); } + if let Some(v) = tracer_version.clone() { - log.insert("_dd.tracer_version", v); + log.insert(event_path!("_dd", "tracer_version"), v); } + Event::Log(log) }) }) .map(|mut event| { - if let Some(k) = &api_key { - event.metadata_mut().set_datadog_api_key(Arc::clone(k)); + if let Event::Log(ref mut log) = event { + event_bytes_received += log.estimated_json_encoded_size_of(); + source.log_namespace.insert_standard_vector_source_metadata( + log, + DatadogAgentConfig::NAME, + now, + ); + if let Some(k) = &api_key { + log.metadata_mut().set_datadog_api_key(Arc::clone(k)); + } } event }) .collect(); + source + .events_received + .emit(CountByteSize(events.len(), event_bytes_received)); + Ok(events) } diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 0a6e0121b7e5c..efec3349521d6 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -3,10 +3,10 @@ mod integration_tests; #[cfg(test)] mod tests; +pub mod llmobs; pub mod logs; pub mod metrics; pub mod traces; -pub mod llmobs; #[allow(warnings, clippy::pedantic, clippy::nursery)] pub(crate) mod ddmetric_proto { @@ -329,8 +329,43 @@ impl SourceConfig for DatadogAgentConfig { ) .with_standard_vector_source_metadata(); + let log_namespace = global_log_namespace.merge(self.log_namespace); + let llmobs_definition = schema::Definition::new_with_default_metadata( + Kind::object( + Collection::empty() + .with_known("span_id", Kind::bytes()) + .with_known("trace_id", Kind::bytes()) + .with_known("parent_id", Kind::bytes().or_undefined()) + .with_known("name", Kind::bytes().or_undefined()) + .with_known("session_id", Kind::bytes().or_undefined()) + .with_known("service", Kind::bytes().or_undefined()) + .with_known("start_ns", Kind::integer().or_undefined()) + .with_known("duration", Kind::integer().or_undefined()) + .with_known("status", Kind::bytes().or_undefined()) + .with_known("status_message", Kind::bytes().or_undefined()) + .with_known("ml_app", Kind::bytes().or_undefined()) + .with_known("meta", Kind::object(Collection::any()).or_undefined()) + .with_known("metrics", Kind::object(Collection::any()).or_undefined()) + .with_known( + "tags", + Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), + ) + .with_known("span_links", Kind::any().or_undefined()) + .with_known("config", Kind::any().or_undefined()) + .with_known("collection_errors", Kind::any().or_undefined()), + ), + [log_namespace], + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))), + &owned_value_path!("timestamp"), + Kind::timestamp(), + Some(meaning::TIMESTAMP), + ) + .with_standard_vector_source_metadata(); + let mut output = Vec::with_capacity(1); - let llmobs_definition = definition.clone(); if self.multiple_outputs { if !self.disable_logs { @@ -343,7 +378,10 @@ impl SourceConfig for DatadogAgentConfig { output.push(SourceOutput::new_traces().with_port(TRACES)) } if !self.disable_llmobs { - output.push(SourceOutput::new_maybe_logs(DataType::Log, llmobs_definition).with_port(LLMOBS)) + output.push( + SourceOutput::new_maybe_logs(DataType::Log, llmobs_definition) + .with_port(LLMOBS), + ) } } else { output.push(SourceOutput::new_maybe_logs( diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index ad74ddc6377e5..474618b10154c 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -70,6 +70,22 @@ const DD_API_SKETCHES_PATH: &str = "/api/beta/sketches"; const DD_API_TRACES_PATH: &str = "/api/v0.2/traces"; const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +fn make_llmobs_source() -> DatadogAgentSource { + let decoder = vector_lib::codecs::Decoder::new( + Framer::Bytes(BytesDecoder::new()), + Deserializer::Bytes(BytesDeserializer), + ); + DatadogAgentSource::new( + true, + decoder, + "http", + None, + LogNamespace::Legacy, + false, + true, + ) +} + fn test_logs_schema_definition() -> schema::Definition { schema::Definition::empty_legacy_namespace().with_event_field( &owned_value_path!("a log field"), @@ -2874,7 +2890,8 @@ fn test_decode_llmobs_body() { ]"#, ); - let events = decode_llmobs_body(body, None).unwrap(); + let source = make_llmobs_source(); + let events = decode_llmobs_body(body, None, &source).unwrap(); assert_eq!(events.len(), 1); let log = events[0].as_log(); @@ -2883,31 +2900,40 @@ fn test_decode_llmobs_body() { assert_eq!(log["name"], "my.workflow".into()); assert_eq!(log["status"], "ok".into()); assert_eq!(log["ml_app"], "my-llm-app".into()); - assert_eq!(log["_dd.tracer_version"], "2.17.0".into()); + assert_eq!( + log["_dd"].as_object().unwrap()["tracer_version"], + "2.17.0".into() + ); } #[test] fn test_decode_llmobs_body_empty_spans() { let body = Bytes::from(r#"[{"event_type": "span", "spans": []}]"#); - let events = decode_llmobs_body(body, None).unwrap(); + let source = make_llmobs_source(); + let events = decode_llmobs_body(body, None, &source).unwrap(); assert_eq!(events.len(), 0); } #[test] fn test_decode_llmobs_body_invalid_json() { let body = Bytes::from("not json"); - assert!(decode_llmobs_body(body, None).is_err()); + let source = make_llmobs_source(); + assert!(decode_llmobs_body(body, None, &source).is_err()); } #[test] fn test_decode_llmobs_body_api_key() { let body = Bytes::from(r#"[{"event_type":"span","spans":[{"span_id":"a","trace_id":"b"}]}]"#); let api_key: Option> = Some(Arc::from("test1234test1234test1234test1234")); + let source = make_llmobs_source(); - let events = decode_llmobs_body(body, api_key).unwrap(); + let events = decode_llmobs_body(body, api_key, &source).unwrap(); assert_eq!(events.len(), 1); assert_eq!( - events[0].metadata().datadog_api_key().map(|k| k.as_ref().to_owned()), + events[0] + .metadata() + .datadog_api_key() + .map(|k| k.as_ref().to_owned()), Some("test1234test1234test1234test1234".to_owned()) ); } From 072fb3ccdedfacb5076dd518368a746f8611589c Mon Sep 17 00:00:00 2001 From: Ronit Anilkumar Date: Wed, 17 Jun 2026 15:41:08 -0700 Subject: [PATCH 5/5] chore: fix trailing newline in changelog fragment --- changelog.d/25441_llmobs_endpoint.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/25441_llmobs_endpoint.feature.md b/changelog.d/25441_llmobs_endpoint.feature.md index 9fd5526649fe0..0ebe076200180 100644 --- a/changelog.d/25441_llmobs_endpoint.feature.md +++ b/changelog.d/25441_llmobs_endpoint.feature.md @@ -1,3 +1,3 @@ The `datadog_agent` source now accepts LLM Observability (LLMObs) telemetry at `/api/v2/llmobs`. When `multiple_outputs` is enabled, LLMObs span events are available as log events on the `llmobs` output port. -authors: ronitanilkumar \ No newline at end of file +authors: ronitanilkumar