diff --git a/changelog.d/25441_llmobs_endpoint.feature.md b/changelog.d/25441_llmobs_endpoint.feature.md new file mode 100644 index 0000000000000..0ebe076200180 --- /dev/null +++ b/changelog.d/25441_llmobs_endpoint.feature.md @@ -0,0 +1,3 @@ +The `datadog_agent` source now accepts LLM Observability (LLMObs) telemetry at `/api/v2/llmobs`. When `multiple_outputs` is enabled, LLMObs span events are available as log events on the `llmobs` output port. + +authors: ronitanilkumar diff --git a/src/sources/datadog_agent/llmobs.rs b/src/sources/datadog_agent/llmobs.rs new file mode 100644 index 0000000000000..fe83d51eef69a --- /dev/null +++ b/src/sources/datadog_agent/llmobs.rs @@ -0,0 +1,242 @@ +use std::sync::Arc; + +use bytes::Bytes; +use chrono::{TimeZone, Utc}; +use http::StatusCode; +use serde::Deserialize; +use serde_json::Value; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + internal_event::{CountByteSize, InternalEventHandle as _}, + json_size::JsonSize, + lookup::event_path, +}; +use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response}; + +use super::{ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, RequestHandler}; +use crate::{ + common::http::ErrorMessage, + config::log_schema, + event::{Event, LogEvent}, + internal_events::DatadogAgentJsonParseError, +}; + +pub(super) fn build_warp_filter( + handler: RequestHandler, + source: DatadogAgentSource, +) -> BoxedFilter<(Response,)> { + let direct = warp::post() + .and(path!("api" / "v2" / "llmobs" / ..)) + .and(warp::path::full()) + .and(warp::header::optional::("content-encoding")) + .and(warp::header::optional::("dd-api-key")) + .and(warp::query::()) + .and(warp::body::bytes()) + .and_then({ + let handler = handler.clone(); + let source = source.clone(); + move |path: FullPath, + encoding_header: Option, + api_token: Option, + query_params: ApiKeyQueryParams, + body: Bytes| { + let events = source + .decode(&encoding_header, body, path.as_str()) + .and_then(|body| { + decode_llmobs_body( + body, + source.api_key_extractor.extract( + path.as_str(), + api_token, + query_params.dd_api_key, + ), + &source, + ) + }); + handler.clone().handle_request(events, super::LLMOBS) + } + }); + + let evp_proxy = warp::post() + .and(path!("evp_proxy" / "v2" / "api" / "v2" / "llmobs" / ..)) + .and(warp::path::full()) + .and(warp::header::optional::("content-encoding")) + .and(warp::header::optional::("dd-api-key")) + .and(warp::query::()) + .and(warp::body::bytes()) + .and_then( + move |path: FullPath, + encoding_header: Option, + api_token: Option, + query_params: ApiKeyQueryParams, + body: Bytes| { + let events = source + .decode(&encoding_header, body, path.as_str()) + .and_then(|body| { + decode_llmobs_body( + body, + source.api_key_extractor.extract( + path.as_str(), + api_token, + query_params.dd_api_key, + ), + &source, + ) + }); + handler.clone().handle_request(events, super::LLMOBS) + }, + ); + + direct.or(evp_proxy).unify().boxed() +} + +#[derive(Deserialize)] +struct LLMObsEnvelopeItem { + #[serde(rename = "event_type")] + _event_type: Option, + spans: Vec, + #[serde(rename = "_dd.tracer_version")] + dd_tracer_version: Option, + #[serde(rename = "_dd.scope")] + _dd_scope: Option, +} + +#[derive(Deserialize)] +struct LLMObsSpan { + span_id: String, + trace_id: String, + parent_id: Option, + name: Option, + session_id: Option, + service: Option, + start_ns: Option, + duration: Option, + status: Option, + status_message: Option, + meta: Option, + metrics: Option, + #[serde(default)] + tags: Vec, + #[serde(rename = "_dd")] + dd: Option, + span_links: Option, + #[serde(rename = "config")] + config: Option, + collection_errors: Option, +} + +pub(crate) fn decode_llmobs_body( + body: Bytes, + api_key: Option>, + source: &DatadogAgentSource, +) -> Result, ErrorMessage> { + let envelope: Vec = serde_json::from_slice(&body).map_err(|error| { + emit!(DatadogAgentJsonParseError { error: &error }); + ErrorMessage::new( + StatusCode::BAD_REQUEST, + format!("Error parsing JSON: {error:?}"), + ) + })?; + + let now = Utc::now(); + let mut event_bytes_received = JsonSize::zero(); + + let events: Vec = envelope + .into_iter() + .flat_map(|item| { + let tracer_version = item.dd_tracer_version.clone(); + item.spans.into_iter().map(move |span| { + let mut log = LogEvent::default(); + log.insert("span_id", span.span_id); + log.insert("trace_id", span.trace_id); + if let Some(v) = span.parent_id { + log.insert("parent_id", v); + } + if let Some(v) = span.name { + log.insert("name", v); + } + if let Some(v) = span.session_id { + log.insert("session_id", v); + } + if let Some(v) = span.service { + log.insert("service", v); + } + if let Some(ns) = span.start_ns { + log.insert("start_ns", ns); + if let Some(ts_path) = log_schema().timestamp_key_target_path() { + log.insert(ts_path, Utc.timestamp_nanos(ns)); + } + } + if let Some(v) = span.duration { + log.insert("duration", v); + } + if let Some(v) = span.status { + log.insert("status", v); + } + if let Some(v) = span.status_message { + log.insert("status_message", v); + } + if let Some(v) = span.meta { + log.insert("meta", v); + } + if let Some(v) = span.metrics { + log.insert("metrics", v); + } + if !span.tags.is_empty() { + log.insert("tags", span.tags.clone()); + } + if let Some(v) = span.span_links { + log.insert("span_links", v); + } + if let Some(v) = span.config { + log.insert("config", v); + } + if let Some(v) = span.collection_errors { + log.insert("collection_errors", v); + } + + // Extract ml_app: first check span._dd.ml_app, then fall back to tags array. + let ml_app = span + .dd + .as_ref() + .and_then(|dd| dd.get("ml_app")) + .and_then(|v| v.as_str()) + .map(str::to_owned) + .or_else(|| { + span.tags + .iter() + .find_map(|tag| tag.strip_prefix("ml_app:").map(str::to_owned)) + }); + if let Some(app) = ml_app { + log.insert("ml_app", app); + } + + if let Some(v) = tracer_version.clone() { + log.insert(event_path!("_dd", "tracer_version"), v); + } + + Event::Log(log) + }) + }) + .map(|mut event| { + if let Event::Log(ref mut log) = event { + event_bytes_received += log.estimated_json_encoded_size_of(); + source.log_namespace.insert_standard_vector_source_metadata( + log, + DatadogAgentConfig::NAME, + now, + ); + if let Some(k) = &api_key { + log.metadata_mut().set_datadog_api_key(Arc::clone(k)); + } + } + event + }) + .collect(); + + source + .events_received + .emit(CountByteSize(events.len(), event_bytes_received)); + + Ok(events) +} diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index ac84188df7c78..efec3349521d6 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -3,6 +3,7 @@ mod integration_tests; #[cfg(test)] mod tests; +pub mod llmobs; pub mod logs; pub mod metrics; pub mod traces; @@ -69,6 +70,7 @@ use crate::{ pub const LOGS: &str = "logs"; pub const METRICS: &str = "metrics"; pub const TRACES: &str = "traces"; +pub const LLMOBS: &str = "llmobs"; /// Configuration for the `datadog_agent` source. #[configurable_component(source( @@ -106,6 +108,11 @@ pub struct DatadogAgentConfig { #[serde(default = "crate::serde::default_false")] disable_traces: bool, + /// If this is set to `true`, LLM Observability events are not accepted by the component. + #[configurable(metadata(docs::advanced))] + #[serde(default = "crate::serde::default_false")] + disable_llmobs: bool, + /// If this is set to `true`, logs, metrics (beta), and traces (alpha) are sent to different outputs. /// /// @@ -179,6 +186,7 @@ impl GenerateConfig for DatadogAgentConfig { disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, multiple_outputs: false, parse_ddtags: false, split_metric_namespace: true, @@ -321,6 +329,42 @@ impl SourceConfig for DatadogAgentConfig { ) .with_standard_vector_source_metadata(); + let log_namespace = global_log_namespace.merge(self.log_namespace); + let llmobs_definition = schema::Definition::new_with_default_metadata( + Kind::object( + Collection::empty() + .with_known("span_id", Kind::bytes()) + .with_known("trace_id", Kind::bytes()) + .with_known("parent_id", Kind::bytes().or_undefined()) + .with_known("name", Kind::bytes().or_undefined()) + .with_known("session_id", Kind::bytes().or_undefined()) + .with_known("service", Kind::bytes().or_undefined()) + .with_known("start_ns", Kind::integer().or_undefined()) + .with_known("duration", Kind::integer().or_undefined()) + .with_known("status", Kind::bytes().or_undefined()) + .with_known("status_message", Kind::bytes().or_undefined()) + .with_known("ml_app", Kind::bytes().or_undefined()) + .with_known("meta", Kind::object(Collection::any()).or_undefined()) + .with_known("metrics", Kind::object(Collection::any()).or_undefined()) + .with_known( + "tags", + Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), + ) + .with_known("span_links", Kind::any().or_undefined()) + .with_known("config", Kind::any().or_undefined()) + .with_known("collection_errors", Kind::any().or_undefined()), + ), + [log_namespace], + ) + .with_source_metadata( + Self::NAME, + Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))), + &owned_value_path!("timestamp"), + Kind::timestamp(), + Some(meaning::TIMESTAMP), + ) + .with_standard_vector_source_metadata(); + let mut output = Vec::with_capacity(1); if self.multiple_outputs { @@ -333,6 +377,12 @@ impl SourceConfig for DatadogAgentConfig { if !self.disable_traces { output.push(SourceOutput::new_traces().with_port(TRACES)) } + if !self.disable_llmobs { + output.push( + SourceOutput::new_maybe_logs(DataType::Log, llmobs_definition) + .with_port(LLMOBS), + ) + } } else { output.push(SourceOutput::new_maybe_logs( DataType::all_bits(), @@ -459,12 +509,19 @@ impl DatadogAgentSource { } if !config.disable_metrics { - let metrics_filter = metrics::build_warp_filter(handler, self.clone()); + let metrics_filter = metrics::build_warp_filter(handler.clone(), self.clone()); filters = filters .map(|f| f.or(metrics_filter.clone()).unify().boxed()) .or(Some(metrics_filter)); } + if !config.disable_llmobs { + let llmobs_filter = llmobs::build_warp_filter(handler.clone(), self.clone()); + filters = filters + .map(|f| f.or(llmobs_filter.clone()).unify().boxed()) + .or(Some(llmobs_filter)); + } + filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into()) } diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index dc7c414722150..474618b10154c 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -3,6 +3,7 @@ use std::{ iter::FromIterator, net::SocketAddr, str, + sync::Arc, time::Duration, }; @@ -58,6 +59,8 @@ use crate::{ }, }; +use crate::sources::datadog_agent::llmobs::decode_llmobs_body; + const DD_API_KEY: &str = "12345678abcdefgh12345678abcdefgh"; const DD_API_LOGS_V1_PATH: &str = "/v1/input/"; const DD_API_LOGS_V2_PATH: &str = "/api/v2/logs"; @@ -67,6 +70,22 @@ const DD_API_SKETCHES_PATH: &str = "/api/beta/sketches"; const DD_API_TRACES_PATH: &str = "/api/v0.2/traces"; const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +fn make_llmobs_source() -> DatadogAgentSource { + let decoder = vector_lib::codecs::Decoder::new( + Framer::Bytes(BytesDecoder::new()), + Deserializer::Bytes(BytesDeserializer), + ); + DatadogAgentSource::new( + true, + decoder, + "http", + None, + LogNamespace::Legacy, + false, + true, + ) +} + fn test_logs_schema_definition() -> schema::Definition { schema::Definition::empty_legacy_namespace().with_event_field( &owned_value_path!("a log field"), @@ -1554,6 +1573,7 @@ fn test_config_outputs_with_disabled_data_types() { disable_logs: bool, disable_metrics: bool, disable_traces: bool, + disable_llmobs: bool, } for TestCase { @@ -1561,48 +1581,56 @@ fn test_config_outputs_with_disabled_data_types() { disable_logs, disable_metrics, disable_traces, + disable_llmobs, } in [ TestCase { multiple_outputs: true, disable_logs: true, disable_metrics: true, disable_traces: true, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: true, disable_metrics: false, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: false, disable_metrics: true, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: false, disable_metrics: false, disable_traces: true, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: true, disable_metrics: true, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: true, disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, }, TestCase { multiple_outputs: false, disable_logs: true, disable_metrics: true, disable_traces: true, + disable_llmobs: false, }, ] { let config = DatadogAgentConfig { @@ -1616,6 +1644,7 @@ fn test_config_outputs_with_disabled_data_types() { disable_logs, disable_metrics, disable_traces, + disable_llmobs, parse_ddtags: false, split_metric_namespace: true, log_namespace: Some(false), @@ -1629,7 +1658,10 @@ fn test_config_outputs_with_disabled_data_types() { .map(|output| output.ty) .collect(); if multiple_outputs { - assert_eq!(outputs.contains(&DataType::Log), !disable_logs); + assert_eq!( + outputs.contains(&DataType::Log), + !disable_logs || !disable_llmobs + ); assert_eq!(outputs.contains(&DataType::Trace), !disable_traces); assert_eq!(outputs.contains(&DataType::Metric), !disable_metrics); } else { @@ -2060,6 +2092,7 @@ fn test_config_outputs() { disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, parse_ddtags: false, split_metric_namespace: true, log_namespace: Some(false), @@ -2793,6 +2826,7 @@ impl ValidatableComponent for DatadogAgentConfig { disable_logs: false, disable_metrics: false, disable_traces: false, + disable_llmobs: false, parse_ddtags: false, split_metric_namespace: true, log_namespace: Some(false), @@ -2832,3 +2866,74 @@ impl ValidatableComponent for DatadogAgentConfig { } register_validatable_component!(DatadogAgentConfig); + +#[test] +fn test_decode_llmobs_body() { + let body = Bytes::from( + r#"[ + { + "event_type": "span", + "_dd.tracer_version": "2.17.0", + "spans": [{ + "span_id": "abc123", + "trace_id": "xyz789", + "name": "my.workflow", + "start_ns": 1707763310981223236, + "duration": 12345678900, + "status": "ok", + "meta": { "span": { "kind": "llm" }, "model_name": "gpt-4" }, + "metrics": { "input_tokens": 64, "output_tokens": 128 }, + "tags": ["env:prod", "service:myapp"], + "_dd": { "ml_app": "my-llm-app" } + }] + } + ]"#, + ); + + let source = make_llmobs_source(); + let events = decode_llmobs_body(body, None, &source).unwrap(); + assert_eq!(events.len(), 1); + + let log = events[0].as_log(); + assert_eq!(log["span_id"], "abc123".into()); + assert_eq!(log["trace_id"], "xyz789".into()); + assert_eq!(log["name"], "my.workflow".into()); + assert_eq!(log["status"], "ok".into()); + assert_eq!(log["ml_app"], "my-llm-app".into()); + assert_eq!( + log["_dd"].as_object().unwrap()["tracer_version"], + "2.17.0".into() + ); +} + +#[test] +fn test_decode_llmobs_body_empty_spans() { + let body = Bytes::from(r#"[{"event_type": "span", "spans": []}]"#); + let source = make_llmobs_source(); + let events = decode_llmobs_body(body, None, &source).unwrap(); + assert_eq!(events.len(), 0); +} + +#[test] +fn test_decode_llmobs_body_invalid_json() { + let body = Bytes::from("not json"); + let source = make_llmobs_source(); + assert!(decode_llmobs_body(body, None, &source).is_err()); +} + +#[test] +fn test_decode_llmobs_body_api_key() { + let body = Bytes::from(r#"[{"event_type":"span","spans":[{"span_id":"a","trace_id":"b"}]}]"#); + let api_key: Option> = Some(Arc::from("test1234test1234test1234test1234")); + let source = make_llmobs_source(); + + let events = decode_llmobs_body(body, api_key, &source).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0] + .metadata() + .datadog_api_key() + .map(|k| k.as_ref().to_owned()), + Some("test1234test1234test1234test1234".to_owned()) + ); +} diff --git a/website/cue/reference/components/sources/datadog_agent.cue b/website/cue/reference/components/sources/datadog_agent.cue index 5bb26bb89823c..50efeb6ae4189 100644 --- a/website/cue/reference/components/sources/datadog_agent.cue +++ b/website/cue/reference/components/sources/datadog_agent.cue @@ -82,6 +82,13 @@ components: sources: datadog_agent: { If [multiple_outputs](#multiple_outputs) is enabled, received trace events will go to this output stream. Use `.traces` as an input to downstream transforms and sinks. """ }, + { + name: "llmobs" + description: """ + If [multiple_outputs](#multiple_outputs) is enabled, received LLM Observability events will go to this output stream. Use `.llmobs` as an input to downstream transforms and sinks. + """ + }, + ] output: { diff --git a/website/cue/reference/components/sources/generated/datadog_agent.cue b/website/cue/reference/components/sources/generated/datadog_agent.cue index fa6a519fe1e75..53474af618179 100644 --- a/website/cue/reference/components/sources/generated/datadog_agent.cue +++ b/website/cue/reference/components/sources/generated/datadog_agent.cue @@ -345,6 +345,11 @@ generated: components: sources: datadog_agent: configuration: { } } } + disable_llmobs: { + description: "If this is set to `true`, LLM Observability events are not accepted by the component." + required: false + type: bool: default: false + } disable_logs: { description: "If this is set to `true`, logs are not accepted by the component." required: false