diff --git a/Cargo.toml b/Cargo.toml index 68d722d9..14c760e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,7 @@ pin-project = "1.1.10" futures-util = "0.3.31" rotel_python_processor_sdk = { path = "rotel_python_processor_sdk", optional = true } pyo3 = { version = "0.24.1", optional = true } -chrono = { version = "0.4.40", features = [ "serde" ] } +chrono = { version = "0.4.40", features = ["serde"] } serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.12" lz4_flex = { version = "0.11.3", default-features = false, features = ["std"] } @@ -90,10 +90,11 @@ aws-config = { version = "1.8.6", optional = true } aws-credential-types = { version = "1.2.6", optional = true } # Use a custom branch that backdates to otel sdk 0.30.0 (we are not compatible with 0.31.0 yet). Also sort data points + attributes for visual stability opentelemetry-prometheus-text-exporter = { git = "https://github.com/mheffner/opentelemetry-prometheus-text-exporter", branch = "sorted-otel-0.30.0", optional = true } +# Making utilities available to the generate-otlp binary +utilities = { path = "utilities" } [dev-dependencies] tokio-test = "0.4.4" -utilities = { path = "utilities" } httpmock = "0.7.0" criterion = { version = "0.5.1", features = ["async_tokio"] } tracing-test = "0.2.5" @@ -141,5 +142,9 @@ harness = false [[bin]] name = "rotel" +[[bin]] +name = "generate-otlp" +path = "src/bin/generate-otlp/main.rs" + [profile.release] lto = "fat" diff --git a/Dockerfile.context-processor b/Dockerfile.context-processor new file mode 100644 index 00000000..60e7be30 --- /dev/null +++ b/Dockerfile.context-processor @@ -0,0 +1,84 @@ +# Python stage - get Python 3.13 with dev headers from Ubuntu +FROM ubuntu:22.04 AS python-builder +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && apt-get install -y \ + software-properties-common \ + && add-apt-repository -y ppa:deadsnakes/ppa \ + && apt-get update \ + && apt-get install -y \ + python3.13 \ + python3.13-dev \ + && rm -rf /var/lib/apt/lists/* + +# Build stage +FROM rust:1.91 AS builder + +# Copy Python 3.13 from python-builder stage +COPY --from=python-builder /usr/bin/python3.13 /usr/bin/python3.13 +COPY --from=python-builder /usr/lib/python3.13 /usr/lib/python3.13 +COPY --from=python-builder /usr/include/python3.13 /usr/include/python3.13 +# Copy Python shared libraries (needed for linking) +RUN mkdir -p /usr/lib/x86_64-linux-gnu +COPY --from=python-builder /usr/lib/x86_64-linux-gnu/libpython3.13* /usr/lib/x86_64-linux-gnu/ + +# Install build dependencies (matching DEVELOPING.md requirements) +RUN apt-get update && apt-get install -y \ + protobuf-compiler \ + cmake \ + clang \ + libclang-dev \ + libzstd-dev \ + build-essential \ + pkg-config \ + libssl-dev \ + && rm -rf /var/lib/apt/lists/* + +# Set environment variables for PyO3 to find Python +ENV PYTHON_SYS_EXECUTABLE=/usr/bin/python3.13 + +WORKDIR /app + +# Copy manifest files +COPY Cargo.toml Cargo.lock ./ +COPY rotel_python_processor_sdk/Cargo.toml ./rotel_python_processor_sdk/ + +# Copy source code +COPY . . + +# Build with pyo3 feature for Python processor support +# Disable default features (rdkafka, aws_iam) to speed up compilation +RUN cargo build --release --no-default-features --features pyo3 --target x86_64-unknown-linux-gnu + +# Runtime stage +# Use Debian to match the builder's GLIBC version (rust:1.91 is Debian-based) +FROM debian:trixie-slim + +# Set environment variables for non-interactive apt operations +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update && apt-get install -y ca-certificates && apt-get clean + +# Set environment variables to prevent Python from writing .pyc files and to unbuffer stdout/stderr +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# Install Python 3.13 from Debian repos (trixie has Python 3.13) +RUN apt-get update && \ + apt-get install -y python3.13 python3.13-venv python3.13-dev && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Set Python 3.13 as the default python3 +RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.13 1 + +# Copy the built binary from builder stage +COPY --from=builder /app/target/x86_64-unknown-linux-gnu/release/rotel /rotel +RUN chmod 0755 /rotel + +# Copy the context processor +RUN mkdir -p /processors +COPY rotel_python_processor_sdk/processors/context_processor.py /processors/context_processor.py + +EXPOSE 5418 + +ENTRYPOINT ["/rotel", "start", "--otlp-http-endpoint", "0.0.0.0:5418", "--otlp-http-include-metadata", "--otlp-http-headers-to-include", "my-custom-header", "--otlp-with-trace-processor", "/processors/context_processor.py", "--exporter", "otlp", "--otlp-exporter-endpoint", "signoz-otel-collector:4317"] diff --git a/README.md b/README.md index 827c0829..5c7ff25d 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,21 @@ follow these steps: telemetrygen traces --otlp-insecure --duration 5s ``` + Alternatively, use the built-in `generate-otlp` tool: + + ```bash + # Generate and send traces directly to Rotel + cargo run --bin generate-otlp -- traces --http-endpoint localhost:4318 + + # Or generate a trace file for testing + cargo run --bin generate-otlp -- traces --file trace.pb + + # Then send it with curl + curl -X POST http://localhost:4318/v1/traces \ + -H "Content-Type: application/x-protobuf" \ + --data-binary @trace.pb + ``` + - Check the output from Rotel and you should see several "Received traces" log lines. ## Configuration diff --git a/rotel_python_processor_sdk/processors/context_processor.py b/rotel_python_processor_sdk/processors/context_processor.py new file mode 100644 index 00000000..ad754a09 --- /dev/null +++ b/rotel_python_processor_sdk/processors/context_processor.py @@ -0,0 +1,141 @@ +""" +ContextProcessor demonstrates how to pull HTTP/gRPC headers from message metadata +(context) and add them as span attributes. This follows the OTel Collector +pattern where headers/metadata are stored in context by the receiver and processors pull +from context to add attributes. + +This processor extracts custom headers from context and adds them as span +attributes following OpenTelemetry semantic conventions (http.request.header.*). + +Usage: + For HTTP, configure the receiver to include metadata: + ROTEL_OTLP_HTTP_INCLUDE_METADATA=true + ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header + + For gRPC, configure the receiver to include metadata: + ROTEL_OTLP_GRPC_INCLUDE_METADATA=true + ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header + + Then use this processor to add those headers as span attributes. + +Message metadata is now exposed to Python processors. Processors can +access headers via: + resource_spans.message_metadata # Returns dict[str, str] or None + +This processor works with both HTTP headers and gRPC metadata - they are +both exposed as the same dictionary format to Python processors. + +This processor demonstrates how to extract headers from context and add +them as span attributes following OpenTelemetry semantic conventions. +""" + +from typing import Optional + +from rotel_sdk.open_telemetry.common.v1 import AnyValue, KeyValue +from rotel_sdk.open_telemetry.logs.v1 import ResourceLogs +from rotel_sdk.open_telemetry.metrics.v1 import ResourceMetrics +from rotel_sdk.open_telemetry.request import RequestContext +from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans + + +def _get_header_from_context( + request_context: Optional[RequestContext], header_name: str +) -> Optional[str]: + """ + Get a header value from message metadata (context). + + Accesses headers via: + resource_spans.request_context.get(header_name) + + The pattern: + - Headers/metadata keys are stored with lowercase keys + - Works for both HTTP headers and gRPC metadata + """ + if request_context is not None: + if isinstance(request_context, RequestContext.HttpContext): + return request_context.http_context.headers.get(header_name.lower()) + elif isinstance(request_context, RequestContext.GrpcContext): + return request_context.grpc_context.metadata.get(header_name.lower()) + return None + + +class ContextProcessor: + def process_spans(self, resource_spans: ResourceSpans): + """ + Process ResourceSpans by extracting a custom header from context + and adding it as a span attribute. + + This function extracts "my-custom-header" from context and adds it as + a span attribute following OTel semantic convention: http.request.header.* + + Example: If the receiver is configured with: + ROTEL_OTLP_HTTP_INCLUDE_METADATA=true + ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header + + Or for gRPC: + ROTEL_OTLP_GRPC_INCLUDE_METADATA=true + ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header + + And the request includes "my-custom-header: test-value-123", then + this processor will add the attribute + "http.request.header.my-custom-header" = "test-value-123" to all spans. + """ + # Header to extract from context + header_name = "my-custom-header" + + # Get header value from context + header_value = _get_header_from_context( + resource_spans.request_context, header_name + ) + + if header_value: + # Create attribute following OTel semantic convention + attr = KeyValue( + key=f"http.request.header.{header_name}", + value=AnyValue(header_value), + ) + + # Add attribute to all spans + for scope_spans in resource_spans.scope_spans: + for span in scope_spans.spans: + span.attributes.append(attr) + + # Example: You can also add to resource attributes instead: + # if header_value and resource_spans.resource: + # resource_spans.resource.attributes.append(attr) + + def process_metrics(self, resource_metrics: ResourceMetrics): + """ + Process metrics - add custom header to resource attributes. + Metrics typically use resource attributes rather than per-metric + attributes. + """ + header_name = "my-custom-header" + header_value = _get_header_from_context( + resource_metrics.request_context, header_name + ) + + if header_value and resource_metrics.resource: + attr = KeyValue( + key=f"http.request.header.{header_name}", + value=AnyValue(header_value), + ) + resource_metrics.resource.attributes.append(attr) + + def process_logs(self, resource_logs: ResourceLogs): + """ + Process logs - add custom header to log record attributes. + """ + header_name = "my-custom-header" + header_value = _get_header_from_context( + resource_logs.request_context, header_name + ) + if header_value: + attr = KeyValue( + key=f"http.request.header.{header_name}", + value=AnyValue(header_value), + ) + + for scope_logs in resource_logs.scope_logs: + for log_record in scope_logs.log_records: + log_record.attributes.append(attr) diff --git a/rotel_python_processor_sdk/python_tests/context_processor_test.py b/rotel_python_processor_sdk/python_tests/context_processor_test.py new file mode 100644 index 00000000..c43d7b4a --- /dev/null +++ b/rotel_python_processor_sdk/python_tests/context_processor_test.py @@ -0,0 +1,23 @@ +import sys + +from rotel_sdk.open_telemetry.logs.v1 import ResourceLogs +from rotel_sdk.open_telemetry.metrics.v1 import ResourceMetrics +from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans + +sys.path.insert(0, './processors') + +from context_processor import ContextProcessor + +processor = ContextProcessor() + + +def process_logs(resource_logs: ResourceLogs): + processor.process_logs(resource_logs) + + +def process_spans(resource_spans: ResourceSpans): + processor.process_spans(resource_spans) + + +def process_metrics(resource_metrics: ResourceMetrics): + processor.process_metrics(resource_metrics) diff --git a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/logs/v1/__init__.pyi b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/logs/v1/__init__.pyi index 18504bd8..65793180 100644 --- a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/logs/v1/__init__.pyi +++ b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/logs/v1/__init__.pyi @@ -1,6 +1,8 @@ +from typing import Optional + from rotel_sdk.open_telemetry.common.v1 import InstrumentationScope, KeyValue, AnyValue +from rotel_sdk.open_telemetry.request import RequestContext from rotel_sdk.open_telemetry.resource.v1 import Resource -from typing import Optional class ResourceLogs: @@ -24,6 +26,7 @@ class ResourceLogs: This schema_url applies to the data in the "resource" field. It does not apply to the data in the "scope_logs" field which have their own schema_url field. """ + request_context: Optional[RequestContext] class ScopeLogs: diff --git a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/metrics/v1/__init__.pyi b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/metrics/v1/__init__.pyi index 587447bd..6c4107d4 100644 --- a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/metrics/v1/__init__.pyi +++ b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/metrics/v1/__init__.pyi @@ -1,6 +1,7 @@ from typing import Optional from rotel_sdk.open_telemetry.common.v1 import InstrumentationScope, KeyValue +from rotel_sdk.open_telemetry.request import RequestContext from rotel_sdk.open_telemetry.resource.v1 import Resource @@ -26,6 +27,7 @@ class ResourceMetrics: This schema_url applies to the data in the "resource" field. It does not apply to the data in the "scope_metrics" field which have their own schema_url field. """ + request_context: Optional[RequestContext] class ScopeMetrics: diff --git a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/request/__init__.py b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/request/__init__.py new file mode 100644 index 00000000..c9631392 --- /dev/null +++ b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/request/__init__.py @@ -0,0 +1,20 @@ +class RequestContext: + """ + Union type representing different metric data types. + """ + HttpContext = HttpContext + GrpcContext = GrpcContext + + @property + def http_context(self) -> HttpContext: ... + + @property + def grpc_context(self) -> GrpcContext: ... + + +class HttpContext: + headers: dict[str, str] + + +class GrpcContext: + metadata: dict[str, str] diff --git a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/trace/v1/__init__.pyi b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/trace/v1/__init__.pyi index 29cd2d56..66aee1ac 100644 --- a/rotel_python_processor_sdk/rotel_sdk/open_telemetry/trace/v1/__init__.pyi +++ b/rotel_python_processor_sdk/rotel_sdk/open_telemetry/trace/v1/__init__.pyi @@ -1,6 +1,8 @@ +from typing import Optional + from rotel_sdk.open_telemetry.common.v1 import InstrumentationScope, KeyValue +from rotel_sdk.open_telemetry.request import RequestContext from rotel_sdk.open_telemetry.resource.v1 import Resource -from typing import Optional class ResourceSpans: @@ -25,6 +27,7 @@ class ResourceSpans: This schema_url applies to the data in the "resource" field. It does not apply to the data in the "scope_spans" field which have their own schema_url field. """ + request_context: Optional[RequestContext] class ScopeSpans: diff --git a/rotel_python_processor_sdk/src/model/mod.rs b/rotel_python_processor_sdk/src/model/mod.rs index 9ab24eef..d6096da2 100644 --- a/rotel_python_processor_sdk/src/model/mod.rs +++ b/rotel_python_processor_sdk/src/model/mod.rs @@ -8,6 +8,7 @@ pub mod trace; use crate::py::logs::*; use crate::py::metrics::*; +use crate::py::request_context::*; use crate::py::rotel_sdk; use crate::py::trace::*; use pyo3::prelude::*; @@ -40,23 +41,25 @@ pub fn register_processor(code: String, script: String, module: String) -> Resul } pub trait PythonProcessable { - fn process(self, processor: &str) -> Self; + fn process(self, processor: &str, headers: Option) -> Self; } impl PythonProcessable for opentelemetry_proto::tonic::trace::v1::ResourceSpans { - fn process(self, processor: &str) -> Self { + fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_spans(self); // Build the PyObject - let spans = ResourceSpans { - resource: inner.resource.clone(), - scope_spans: inner.scope_spans.clone(), - schema_url: inner.schema_url.clone(), - }; let res = Python::with_gil(|py| -> PyResult<()> { + let spans = ResourceSpans { + resource: inner.resource.clone(), + scope_spans: inner.scope_spans.clone(), + schema_url: inner.schema_url.clone(), + request_context, + }; let py_mod = PyModule::import(py, processor)?; let result_py_object = py_mod.getattr("process_spans")?.call1((spans,)); if result_py_object.is_err() { let err = result_py_object.unwrap_err(); + error!("Python processor error: {}", err.to_string()); return Err(err); } Ok(()) @@ -74,25 +77,37 @@ impl PythonProcessable for opentelemetry_proto::tonic::trace::v1::ResourceSpans if resource.is_some() { resource_span.resource = py_transform::transform_resource(resource.unwrap()); } - let scope_spans = Arc::into_inner(inner.scope_spans) - .unwrap() - .into_inner() - .unwrap(); + // Try to extract scope_spans, fall back to cloning if Arc has multiple references + let scope_spans = match Arc::try_unwrap(inner.scope_spans) { + Ok(mutex) => match mutex.into_inner() { + Ok(vec) => vec, + Err(_) => { + // Mutex is poisoned, return empty spans + return resource_span; + } + }, + Err(arc) => { + // Arc still has references (Python may be holding them), clone instead + let locked = arc.lock().unwrap(); + locked.clone() + } + }; resource_span.scope_spans = py_transform::transform_spans(scope_spans); resource_span } } impl PythonProcessable for opentelemetry_proto::tonic::metrics::v1::ResourceMetrics { - fn process(self, processor: &str) -> Self { + fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_metrics(self); // Build the PyObject - let spans = ResourceMetrics { - resource: inner.resource.clone(), - scope_metrics: inner.scope_metrics.clone(), - schema_url: inner.schema_url.clone(), - }; let res = Python::with_gil(|py| -> PyResult<()> { + let spans = ResourceMetrics { + resource: inner.resource.clone(), + scope_metrics: inner.scope_metrics.clone(), + schema_url: inner.schema_url.clone(), + request_context, + }; let py_mod = PyModule::import(py, processor)?; let result_py_object = py_mod.getattr("process_metrics")?.call1((spans,)); if result_py_object.is_err() { @@ -124,15 +139,16 @@ impl PythonProcessable for opentelemetry_proto::tonic::metrics::v1::ResourceMetr } impl PythonProcessable for opentelemetry_proto::tonic::logs::v1::ResourceLogs { - fn process(self, processor: &str) -> Self { + fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_logs(self); // Build the PyObject - let spans = ResourceLogs { - resource: inner.resource.clone(), - scope_logs: inner.scope_logs.clone(), - schema_url: inner.schema_url.clone(), - }; let res = Python::with_gil(|py| -> PyResult<()> { + let spans = ResourceLogs { + resource: inner.resource.clone(), + scope_logs: inner.scope_logs.clone(), + schema_url: inner.schema_url.clone(), + request_context, + }; let py_mod = PyModule::import(py, processor)?; let result_py_object = py_mod.getattr("process_logs")?.call1((spans,)); if result_py_object.is_err() { diff --git a/rotel_python_processor_sdk/src/py/logs.rs b/rotel_python_processor_sdk/src/py/logs.rs index 882ddc96..b1a40c8b 100644 --- a/rotel_python_processor_sdk/src/py/logs.rs +++ b/rotel_python_processor_sdk/src/py/logs.rs @@ -4,6 +4,7 @@ use crate::model::logs::{RLogRecord, RScopeLogs}; use crate::model::otel_transform::convert_attributes; use crate::model::resource::RResource; use crate::py::common::{AnyValue, KeyValue}; +use crate::py::request_context::RequestContext; use crate::py::{handle_poison_error, AttributesList, InstrumentationScope, Resource}; use pyo3::{pyclass, pymethods, Py, PyErr, PyRef, PyRefMut, PyResult, Python}; use std::sync::{Arc, Mutex}; @@ -14,6 +15,7 @@ pub struct ResourceLogs { pub resource: Arc>>, pub scope_logs: Arc>>>>, pub schema_url: String, + pub request_context: Option, } #[pymethods] @@ -67,6 +69,10 @@ impl ResourceLogs { self.schema_url = schema_url; Ok(()) } + #[getter] + fn request_context(&self) -> PyResult> { + Ok(self.request_context.clone()) + } } #[pyclass] diff --git a/rotel_python_processor_sdk/src/py/metrics.rs b/rotel_python_processor_sdk/src/py/metrics.rs index dad87934..61423c45 100644 --- a/rotel_python_processor_sdk/src/py/metrics.rs +++ b/rotel_python_processor_sdk/src/py/metrics.rs @@ -14,6 +14,7 @@ use crate::model::metrics::{ RNumberDataPoint, RNumberDataPointValue, RScopeMetrics, RSum, RSummary, RSummaryDataPoint, RValueAtQuantile, }; +use crate::py::request_context::RequestContext; // --- PyO3 Bindings for RResourceMetrics --- #[pyclass] @@ -22,6 +23,7 @@ pub struct ResourceMetrics { pub resource: Arc>>, pub scope_metrics: Arc>>>>, pub schema_url: String, + pub request_context: Option, } #[pymethods] @@ -80,6 +82,10 @@ impl ResourceMetrics { self.schema_url = schema_url; Ok(()) } + #[getter] + fn request_context(&self) -> PyResult> { + Ok(self.request_context.clone()) + } } // --- PyO3 Bindings for ScopeMetricsList (for iterating and accessing) --- diff --git a/rotel_python_processor_sdk/src/py/mod.rs b/rotel_python_processor_sdk/src/py/mod.rs index 338dd83c..df4dcd68 100644 --- a/rotel_python_processor_sdk/src/py/mod.rs +++ b/rotel_python_processor_sdk/src/py/mod.rs @@ -1,6 +1,7 @@ pub mod common; pub mod logs; pub mod metrics; +pub mod request_context; pub mod resource; pub mod trace; @@ -12,6 +13,7 @@ use crate::py::metrics::{ HistogramDataPoint, Metric, MetricData, NumberDataPoint, NumberDataPointValue, ResourceMetrics, ScopeMetrics, Sum, Summary, SummaryDataPoint, ValueAtQuantile, }; +use crate::py::request_context::{GrpcContext, HttpContext, RequestContext}; use py::common::*; use py::logs::*; use py::resource::*; @@ -42,8 +44,9 @@ pub fn rotel_sdk(m: &Bound<'_, PyModule>) -> PyResult<()> { let trace_module = PyModule::new(open_telemetry_module.py(), "trace")?; let resource_module = PyModule::new(open_telemetry_module.py(), "resource")?; let common_module = PyModule::new(open_telemetry_module.py(), "common")?; - let logs_module = PyModule::new(open_telemetry_module.py(), "logs")?; // Added logs module - let metrics_module = PyModule::new(open_telemetry_module.py(), "metrics")?; // Added logs module + let logs_module = PyModule::new(open_telemetry_module.py(), "logs")?; + let metrics_module = PyModule::new(open_telemetry_module.py(), "metrics")?; + let request_module = PyModule::new(open_telemetry_module.py(), "request")?; let trace_v1_module = PyModule::new(trace_module.py(), "v1")?; let common_v1_module = PyModule::new(common_module.py(), "v1")?; let resource_v1_module = PyModule::new(resource_module.py(), "v1")?; @@ -56,6 +59,7 @@ pub fn rotel_sdk(m: &Bound<'_, PyModule>) -> PyResult<()> { open_telemetry_module.add_submodule(&trace_module)?; open_telemetry_module.add_submodule(&resource_module)?; open_telemetry_module.add_submodule(&common_module)?; + open_telemetry_module.add_submodule(&request_module)?; m.add_submodule(&open_telemetry_module)?; m.py() @@ -108,6 +112,11 @@ pub fn rotel_sdk(m: &Bound<'_, PyModule>) -> PyResult<()> { .getattr("modules")? .set_item("rotel_sdk.open_telemetry.metrics.v1", &metrics_v1_module)?; + m.py() + .import("sys")? + .getattr("modules")? + .set_item("rotel_sdk.open_telemetry.request", &request_module)?; + common_v1_module.add_class::()?; common_v1_module.add_class::()?; common_v1_module.add_class::()?; @@ -153,6 +162,10 @@ pub fn rotel_sdk(m: &Bound<'_, PyModule>) -> PyResult<()> { metrics_v1_module.add_class::()?; metrics_v1_module.add_class::()?; + request_module.add_class::()?; + request_module.add_class::()?; + request_module.add_class::()?; + Ok(()) } @@ -164,6 +177,7 @@ mod tests { use crate::model::common::{REntityRef, RValue::*}; use crate::model::{otel_transform, py_transform}; use crate::py::metrics::ResourceMetrics; + use crate::py::request_context::RequestContext; use chrono::Utc; use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_proto::tonic::metrics::v1::metric::Data; @@ -853,6 +867,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: Arc::new(Mutex::new(vec![])), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("resource_spans_append_attribute.py", py, py_resource_spans) @@ -872,6 +887,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("resource_spans_iterate_spans.py", py, py_resource_spans) @@ -891,6 +907,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script( @@ -948,6 +965,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("set_instrumentation_scope_test.py", py, py_resource_spans) @@ -994,6 +1012,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("read_and_write_spans_test.py", py, py_resource_spans) @@ -1085,6 +1104,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("set_scope_spans_span_test.py", py, py_resource_spans) @@ -1169,6 +1189,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script( @@ -1209,6 +1230,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("write_span_events_test.py", py, py_resource_spans) @@ -1258,6 +1280,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("write_scope_spans_test.py", py, py_resource_spans) @@ -1312,6 +1335,7 @@ mod tests { resource: resource_spans.resource.clone(), scope_spans: resource_spans.scope_spans.clone(), schema_url: resource_spans.schema_url, + request_context: None, }; Python::with_gil(|py| -> PyResult<()> { run_script("write_spans_test.py", py, py_resource_spans) @@ -1408,6 +1432,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script @@ -1501,6 +1526,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script that adds a new log record @@ -1627,6 +1653,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -1764,6 +1791,7 @@ mod tests { resource: r_resource_spans.resource.clone(), scope_spans: r_resource_spans.scope_spans.clone(), schema_url: r_resource_spans.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -1940,6 +1968,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2070,6 +2099,7 @@ mod tests { resource: r_resource_spans.resource.clone(), scope_spans: r_resource_spans.scope_spans.clone(), schema_url: r_resource_spans.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2129,6 +2159,7 @@ mod tests { resource: r_resource_metrics.resource.clone(), scope_metrics: r_resource_metrics.scope_metrics.clone(), schema_url: r_resource_metrics.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record Python::with_gil(|py| -> PyResult<()> { @@ -2287,6 +2318,7 @@ mod tests { resource: r_resource_spans.resource.clone(), scope_spans: r_resource_spans.scope_spans.clone(), schema_url: r_resource_spans.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2419,6 +2451,7 @@ mod tests { resource: r_resource_metrics.resource.clone(), scope_metrics: r_resource_metrics.scope_metrics.clone(), schema_url: r_resource_metrics.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record Python::with_gil(|py| -> PyResult<()> { @@ -2554,6 +2587,7 @@ mod tests { resource: r_resource_spans.resource.clone(), scope_spans: r_resource_spans.scope_spans.clone(), schema_url: r_resource_spans.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2641,6 +2675,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2695,6 +2730,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2775,6 +2811,7 @@ mod tests { resource: r_resource_logs.resource.clone(), scope_logs: r_resource_logs.scope_logs.clone(), schema_url: r_resource_logs.schema_url.clone(), + request_context: None, }; // Execute the Python script that removes a log record @@ -2826,6 +2863,336 @@ mod tests { } } + #[test] + fn context_processor_logs_test() { + initialize(); + let mut logs_request = FakeOTLP::logs_service_request(); + let log_body = opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue( + "Login successful: password=1234567890".to_string(), + )), + }; + logs_request.resource_logs[0].scope_logs[0].log_records[0].body = Some(log_body); + + // Transform the protobuf ResourceLogs into our internal RResourceLogs + let r_resource_logs = crate::model::otel_transform::transform_resource_logs( + logs_request.resource_logs[0].clone(), + ); + + let mut req_context = HashMap::new(); + req_context.insert("my-custom-header".to_string(), "my-value".to_string()); + + // Create the Python-exposed ResourceLogs object + let py_resource_logs = ResourceLogs { + resource: r_resource_logs.resource.clone(), + scope_logs: r_resource_logs.scope_logs.clone(), + schema_url: r_resource_logs.schema_url.clone(), + request_context: Some(RequestContext::HttpContext(HttpContext { + headers: req_context, + })), + }; + + // Execute the Python script that removes a log record + Python::with_gil(|py| -> PyResult<()> { + _run_script( + "context_processor_test.py", + py, + py_resource_logs, + Some("process_logs".to_string()), + ) + }) + .unwrap(); + + let scope_logs_vec = Arc::into_inner(r_resource_logs.scope_logs) + .unwrap() + .into_inner() + .unwrap(); + let mut scope_logs = py_transform::transform_logs(scope_logs_vec); + let log = scope_logs.pop().unwrap().log_records.pop().unwrap(); + let kv_map: HashMap> = log + .attributes + .into_iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + assert_eq!(kv_map.len(), 1); + assert_eq!( + kv_map.get("http.request.header.my-custom-header"), + Some(&Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue("my-value".to_string())), + })) + ); + + let mut logs_request = FakeOTLP::logs_service_request(); + let log_body = opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue( + "Login successful: password=1234567890".to_string(), + )), + }; + logs_request.resource_logs[0].scope_logs[0].log_records[0].body = Some(log_body); + + // Transform the protobuf ResourceLogs into our internal RResourceLogs + let r_resource_logs = crate::model::otel_transform::transform_resource_logs( + logs_request.resource_logs[0].clone(), + ); + + let mut req_context = HashMap::new(); + req_context.insert("my-custom-header".to_string(), "my-value".to_string()); + + // Create the Python-exposed ResourceLogs object + let py_resource_logs = ResourceLogs { + resource: r_resource_logs.resource.clone(), + scope_logs: r_resource_logs.scope_logs.clone(), + schema_url: r_resource_logs.schema_url.clone(), + request_context: Some(RequestContext::GrpcContext(GrpcContext { + metadata: req_context, + })), + }; + + // Execute the Python script that removes a log record + Python::with_gil(|py| -> PyResult<()> { + _run_script( + "context_processor_test.py", + py, + py_resource_logs, + Some("process_logs".to_string()), + ) + }) + .unwrap(); + + let scope_logs_vec = Arc::into_inner(r_resource_logs.scope_logs) + .unwrap() + .into_inner() + .unwrap(); + let mut scope_logs = py_transform::transform_logs(scope_logs_vec); + let log = scope_logs.pop().unwrap().log_records.pop().unwrap(); + let kv_map: HashMap> = log + .attributes + .into_iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + assert_eq!(kv_map.len(), 1); + assert_eq!( + kv_map.get("http.request.header.my-custom-header"), + Some(&Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue("my-value".to_string())), + })) + ); + } + + #[test] + fn context_processor_trace_test() { + initialize(); + let trace_request = FakeOTLP::trace_service_request(); + + // Transform the protobuf ResourceLogs into our internal RResourceLogs + let r_resource_spans = + otel_transform::transform_resource_spans(trace_request.resource_spans[0].clone()); + + let mut req_context = HashMap::new(); + req_context.insert("my-custom-header".to_string(), "my-value".to_string()); + + // Create the Python-exposed ResourceLogs object + let py_resource_spans = ResourceSpans { + resource: r_resource_spans.resource.clone(), + scope_spans: r_resource_spans.scope_spans.clone(), + schema_url: r_resource_spans.schema_url.clone(), + request_context: Some(RequestContext::HttpContext(HttpContext { + headers: req_context, + })), + }; + + // Execute the Python script that removes a log record + Python::with_gil(|py| -> PyResult<()> { + _run_script( + "context_processor_test.py", + py, + py_resource_spans, + Some("process_spans".to_string()), + ) + }) + .unwrap(); + + let scope_spans_vec = Arc::into_inner(r_resource_spans.scope_spans) + .unwrap() + .into_inner() + .unwrap(); + let mut scope_spans = py_transform::transform_spans(scope_spans_vec); + + let span = scope_spans.pop().unwrap().spans.pop().unwrap(); + + let kv_map: HashMap> = + span.attributes + .into_iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + + assert_eq!(kv_map.len(), 3); + assert_eq!( + kv_map.get("http.request.header.my-custom-header"), + Some(&Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue("my-value".to_string())), + })) + ); + + let trace_request = FakeOTLP::trace_service_request(); + + // Transform the protobuf ResourceLogs into our internal RResourceLogs + let r_resource_spans = + otel_transform::transform_resource_spans(trace_request.resource_spans[0].clone()); + + let mut req_context = HashMap::new(); + req_context.insert("my-custom-header".to_string(), "my-grpc-value".to_string()); + + // Create the Python-exposed ResourceLogs object + let py_resource_spans = ResourceSpans { + resource: r_resource_spans.resource.clone(), + scope_spans: r_resource_spans.scope_spans.clone(), + schema_url: r_resource_spans.schema_url.clone(), + request_context: Some(RequestContext::GrpcContext(GrpcContext { + metadata: req_context, + })), + }; + + // Execute the Python script that removes a log record + Python::with_gil(|py| -> PyResult<()> { + _run_script( + "context_processor_test.py", + py, + py_resource_spans, + Some("process_spans".to_string()), + ) + }) + .unwrap(); + + let scope_spans_vec = Arc::into_inner(r_resource_spans.scope_spans) + .unwrap() + .into_inner() + .unwrap(); + let mut scope_spans = py_transform::transform_spans(scope_spans_vec); + + let span = scope_spans.pop().unwrap().spans.pop().unwrap(); + + let kv_map: HashMap> = + span.attributes + .into_iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + + assert_eq!(kv_map.len(), 3); + assert_eq!( + kv_map.get("http.request.header.my-custom-header"), + Some(&Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue("my-grpc-value".to_string())), + })) + ); + } + + #[test] + fn context_processor_metrics_test() { + initialize(); + let metrics_request = FakeOTLP::metrics_service_request(); + + // Transform the protobuf ResourceLogs into our internal RResourceLogs + let r_resource_metrics = crate::model::otel_transform::transform_resource_metrics( + metrics_request.resource_metrics[0].clone(), + ); + + let mut req_context = HashMap::new(); + req_context.insert("my-custom-header".to_string(), "my-value".to_string()); + + // Create the Python-exposed ResourceMetrics object + let py_resource_metrics = ResourceMetrics { + resource: r_resource_metrics.resource.clone(), + scope_metrics: r_resource_metrics.scope_metrics.clone(), + schema_url: r_resource_metrics.schema_url.clone(), + request_context: Some(RequestContext::HttpContext(HttpContext { + headers: req_context, + })), + }; + // Execute the Python script that removes a log record + Python::with_gil(|py| -> PyResult<()> { + _run_script( + "context_processor_test.py", + py, + py_resource_metrics, + Some("process_metrics".to_string()), + ) + }) + .unwrap(); + + let resource = Arc::into_inner(r_resource_metrics.resource) + .unwrap() + .into_inner() + .unwrap() + .unwrap(); + let resource = py_transform::transform_resource(resource).unwrap(); + + let kv_map: HashMap> = + resource + .attributes + .into_iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + assert_eq!(kv_map.len(), 7); + assert_eq!( + kv_map.get("http.request.header.my-custom-header"), + Some(&Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue("my-value".to_string())), + })) + ); + + let metrics_request = FakeOTLP::metrics_service_request(); + + // Transform the protobuf ResourceLogs into our internal RResourceLogs + let r_resource_metrics = crate::model::otel_transform::transform_resource_metrics( + metrics_request.resource_metrics[0].clone(), + ); + + let mut req_context = HashMap::new(); + req_context.insert("my-custom-header".to_string(), "my-grpc-value".to_string()); + + // Create the Python-exposed ResourceMetrics object + let py_resource_metrics = ResourceMetrics { + resource: r_resource_metrics.resource.clone(), + scope_metrics: r_resource_metrics.scope_metrics.clone(), + schema_url: r_resource_metrics.schema_url.clone(), + request_context: Some(RequestContext::GrpcContext(GrpcContext { + metadata: req_context, + })), + }; + // Execute the Python script that removes a log record + Python::with_gil(|py| -> PyResult<()> { + _run_script( + "context_processor_test.py", + py, + py_resource_metrics, + Some("process_metrics".to_string()), + ) + }) + .unwrap(); + + let resource = Arc::into_inner(r_resource_metrics.resource) + .unwrap() + .into_inner() + .unwrap() + .unwrap(); + let resource = py_transform::transform_resource(resource).unwrap(); + + let kv_map: HashMap> = + resource + .attributes + .into_iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + assert_eq!(kv_map.len(), 7); + assert_eq!( + kv_map.get("http.request.header.my-custom-header"), + Some(&Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some(Value::StringValue("my-grpc-value".to_string())), + })) + ); + } + #[test] fn read_and_write_metrics_test() { initialize(); @@ -3123,6 +3490,7 @@ mod tests { resource: r_resource_metrics.resource.clone(), scope_metrics: r_resource_metrics.scope_metrics.clone(), schema_url: r_resource_metrics.schema_url.clone(), + request_context: None, }; // Execute the Python script that will verify initial values and then mutate them diff --git a/rotel_python_processor_sdk/src/py/request_context.rs b/rotel_python_processor_sdk/src/py/request_context.rs new file mode 100644 index 00000000..51cb161a --- /dev/null +++ b/rotel_python_processor_sdk/src/py/request_context.rs @@ -0,0 +1,70 @@ +use pyo3::{pyclass, pymethods, PyResult}; +use std::collections::HashMap; + +#[pyclass] +#[derive(Clone)] +pub enum RequestContext { + HttpContext(HttpContext), + GrpcContext(GrpcContext), +} + +#[pyclass] +#[derive(Clone)] +pub struct HttpContext { + pub headers: HashMap, +} + +#[pyclass] +#[derive(Clone)] +pub struct GrpcContext { + pub metadata: HashMap, +} + +#[pymethods] +impl RequestContext { + #[getter] + fn http_context(&self) -> PyResult { + match self { + RequestContext::HttpContext(ctx) => Ok(ctx.clone()), + _ => Err(pyo3::exceptions::PyAttributeError::new_err( + "not an HttpContext variant", + )), + } + } + + #[getter] + fn grpc_context(&self) -> PyResult { + match self { + RequestContext::GrpcContext(ctx) => Ok(ctx.clone()), + _ => Err(pyo3::exceptions::PyAttributeError::new_err( + "not an GrpcContext variant", + )), + } + } +} + +#[pymethods] +impl HttpContext { + #[new] + fn new(headers: HashMap) -> PyResult { + Ok(HttpContext { headers }) + } + + #[getter] + fn headers(&self) -> PyResult> { + Ok(self.headers.clone()) + } +} + +#[pymethods] +impl GrpcContext { + #[new] + fn new(metadata: HashMap) -> PyResult { + Ok(GrpcContext { metadata }) + } + + #[getter] + fn metadata(&self) -> PyResult> { + Ok(self.metadata.clone()) + } +} diff --git a/rotel_python_processor_sdk/src/py/trace.rs b/rotel_python_processor_sdk/src/py/trace.rs index bc97a4f0..8b0f1810 100644 --- a/rotel_python_processor_sdk/src/py/trace.rs +++ b/rotel_python_processor_sdk/src/py/trace.rs @@ -3,6 +3,7 @@ use crate::model::otel_transform::convert_attributes; use crate::model::resource::RResource; use crate::model::trace::{REvent, RLink, RScopeSpans, RSpan, RStatus}; use crate::py::common::KeyValue; +use crate::py::request_context::RequestContext; use crate::py::{handle_poison_error, AttributesList, InstrumentationScope, Resource}; use pyo3::exceptions::PyRuntimeError; use pyo3::{pyclass, pymethods, Py, PyErr, PyRef, PyRefMut, PyResult, Python}; @@ -15,6 +16,7 @@ pub struct ResourceSpans { pub resource: Arc>>, pub scope_spans: Arc>>>>, pub schema_url: String, + pub request_context: Option, } #[pymethods] @@ -68,6 +70,11 @@ impl ResourceSpans { self.schema_url = schema_url; Ok(()) } + + #[getter] + fn request_context(&self) -> PyResult> { + Ok(self.request_context.clone()) + } } #[pyclass] diff --git a/src/bin/generate-otlp/main.rs b/src/bin/generate-otlp/main.rs new file mode 100644 index 00000000..09a6beb1 --- /dev/null +++ b/src/bin/generate-otlp/main.rs @@ -0,0 +1,473 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Utility to generate OTLP telemetry data files for testing and debugging + +use bytes::Bytes; +use clap::{Args, Parser, Subcommand, ValueEnum}; +use http::header::{CONTENT_TYPE, HeaderValue}; +use http::{Method, Request}; +use http_body_util::Full; +use hyper_util::client::legacy::Client; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; +use prost::Message; +use std::fs; +use std::path::PathBuf; +use std::process::ExitCode; +use std::str::FromStr; +use tokio; +use utilities::otlp::FakeOTLP; + +#[derive(Debug, Parser)] +#[command(name = "generate-otlp")] +#[command(bin_name = "generate-otlp")] +#[command(version, about = "Generate OTLP telemetry data files for testing")] +struct Arguments { + #[command(subcommand)] + command: Commands, +} + +#[derive(Debug, Subcommand)] +enum Commands { + /// Generate trace data + Traces(GenerateArgs), + /// Generate metrics data + Metrics(GenerateArgs), + /// Generate logs data + Logs(GenerateArgs), + /// Return version + Version, +} + +#[derive(Debug, Clone, ValueEnum)] +enum Protocol { + Http, + Grpc, +} + +#[derive(Debug, Args)] +struct GenerateArgs { + /// Write to file instead of sending to endpoint + #[arg(long)] + file: Option, + + /// Protocol to use when sending (ignored if --file is set) + #[arg(short, long, value_enum, default_value = "http")] + protocol: Protocol, + + /// Endpoint address (host:port) for gRPC (default: 0.0.0.0:4317) + #[arg(long, default_value = "0.0.0.0:4317")] + grpc_endpoint: String, + + /// Endpoint address (host:port) for HTTP (default: 0.0.0.0:4318) + #[arg(long, default_value = "0.0.0.0:4318")] + http_endpoint: String, + + /// Number of resource spans/metrics/logs to generate + #[arg(short, long, default_value = "1")] + resources: usize, + + /// Number of spans/metrics/logs per resource + #[arg(short, long, default_value = "1")] + items: usize, + + /// Include dummy test headers (my-custom-header, another-header) for testing header context + #[arg(long)] + include_headers: bool, +} + +fn main() -> ExitCode { + let args = Arguments::parse(); + + match args.command { + Commands::Version => { + println!("generate-otlp {}", env!("CARGO_PKG_VERSION")); + ExitCode::SUCCESS + } + Commands::Traces(gen_args) => { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(generate_traces(gen_args)) + } + Commands::Metrics(gen_args) => { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(generate_metrics(gen_args)) + } + Commands::Logs(gen_args) => { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(generate_logs(gen_args)) + } + } +} + +async fn generate_traces(args: GenerateArgs) -> ExitCode { + let trace_req = FakeOTLP::trace_service_request_with_spans(args.resources, args.items); + + if let Some(file_path) = args.file { + // Write to file + let mut buf = Vec::with_capacity(trace_req.encoded_len()); + if let Err(e) = trace_req.encode(&mut buf) { + eprintln!("Failed to encode trace request: {}", e); + return ExitCode::FAILURE; + } + + if let Err(e) = fs::write(&file_path, &buf) { + eprintln!( + "Failed to write trace file to {}: {}", + file_path.display(), + e + ); + return ExitCode::FAILURE; + } + + println!( + "✓ Trace file created at {} ({} bytes, {} resource spans, {} spans total)", + file_path.display(), + buf.len(), + args.resources, + args.resources * args.items + ); + ExitCode::SUCCESS + } else { + // Send to endpoint + send_traces(trace_req, &args).await + } +} + +async fn generate_metrics(args: GenerateArgs) -> ExitCode { + let metrics_req = FakeOTLP::metrics_service_request_with_metrics(args.resources, args.items); + + if let Some(file_path) = args.file { + // Write to file + let mut buf = Vec::with_capacity(metrics_req.encoded_len()); + if let Err(e) = metrics_req.encode(&mut buf) { + eprintln!("Failed to encode metrics request: {}", e); + return ExitCode::FAILURE; + } + + if let Err(e) = fs::write(&file_path, &buf) { + eprintln!( + "Failed to write metrics file to {}: {}", + file_path.display(), + e + ); + return ExitCode::FAILURE; + } + + println!( + "✓ Metrics file created at {} ({} bytes, {} resource metrics, {} metrics total)", + file_path.display(), + buf.len(), + args.resources, + args.resources * args.items + ); + ExitCode::SUCCESS + } else { + // Send to endpoint + send_metrics(metrics_req, &args).await + } +} + +async fn generate_logs(args: GenerateArgs) -> ExitCode { + let logs_req = FakeOTLP::logs_service_request_with_logs(args.resources, args.items); + + if let Some(file_path) = args.file { + // Write to file + let mut buf = Vec::with_capacity(logs_req.encoded_len()); + if let Err(e) = logs_req.encode(&mut buf) { + eprintln!("Failed to encode logs request: {}", e); + return ExitCode::FAILURE; + } + + if let Err(e) = fs::write(&file_path, &buf) { + eprintln!( + "Failed to write logs file to {}: {}", + file_path.display(), + e + ); + return ExitCode::FAILURE; + } + + println!( + "✓ Logs file created at {} ({} bytes, {} resource logs, {} log records total)", + file_path.display(), + buf.len(), + args.resources, + args.resources * args.items + ); + ExitCode::SUCCESS + } else { + // Send to endpoint + send_logs(logs_req, &args).await + } +} + +// Helper functions to send telemetry data + +async fn send_traces( + trace_req: opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest, + args: &GenerateArgs, +) -> ExitCode { + match args.protocol { + Protocol::Http => { + send_http( + trace_req, + &format!("http://{}/v1/traces", args.http_endpoint), + "traces", + args.include_headers, + ) + .await + } + Protocol::Grpc => { + send_grpc_traces(trace_req, &args.grpc_endpoint, args.include_headers).await + } + } +} + +async fn send_metrics( + metrics_req: opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest, + args: &GenerateArgs, +) -> ExitCode { + match args.protocol { + Protocol::Http => { + send_http( + metrics_req, + &format!("http://{}/v1/metrics", args.http_endpoint), + "metrics", + args.include_headers, + ) + .await + } + Protocol::Grpc => { + send_grpc_metrics(metrics_req, &args.grpc_endpoint, args.include_headers).await + } + } +} + +async fn send_logs( + logs_req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest, + args: &GenerateArgs, +) -> ExitCode { + match args.protocol { + Protocol::Http => { + send_http( + logs_req, + &format!("http://{}/v1/logs", args.http_endpoint), + "logs", + args.include_headers, + ) + .await + } + Protocol::Grpc => send_grpc_logs(logs_req, &args.grpc_endpoint, args.include_headers).await, + } +} + +async fn send_http( + msg: T, + url: &str, + telemetry_type: &str, + include_headers: bool, +) -> ExitCode { + let client: Client> = + Client::builder(TokioExecutor::new()).build_http(); + + let mut buf = Vec::with_capacity(msg.encoded_len()); + if let Err(e) = msg.encode(&mut buf) { + eprintln!("Failed to encode {} request: {}", telemetry_type, e); + return ExitCode::FAILURE; + } + + let uri = match http::Uri::from_str(url) { + Ok(uri) => uri, + Err(e) => { + eprintln!("Invalid URL {}: {}", url, e); + return ExitCode::FAILURE; + } + }; + + let mut req_builder = Request::builder().method(Method::POST).uri(uri).header( + CONTENT_TYPE, + HeaderValue::from_static("application/x-protobuf"), + ); + + // Add dummy test headers if requested + if include_headers { + let headers = FakeOTLP::example_headers(); + for (key, value) in headers { + if let Ok(header_value) = HeaderValue::from_str(&value) { + req_builder = req_builder.header(key, header_value); + } + } + } + + let req = match req_builder.body(Full::new(Bytes::from(buf))) { + Ok(req) => req, + Err(e) => { + eprintln!("Failed to build HTTP request: {}", e); + return ExitCode::FAILURE; + } + }; + + match client.request(req).await { + Ok(resp) => { + let status = resp.status(); + if status.is_success() { + println!( + "✓ Successfully sent {} to {} (status: {})", + telemetry_type, url, status + ); + ExitCode::SUCCESS + } else { + eprintln!( + "✗ Failed to send {} to {} (status: {})", + telemetry_type, url, status + ); + ExitCode::FAILURE + } + } + Err(e) => { + eprintln!("✗ Failed to send {} to {}: {}", telemetry_type, url, e); + ExitCode::FAILURE + } + } +} + +async fn send_grpc_traces( + trace_req: opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest, + endpoint: &str, + include_headers: bool, +) -> ExitCode { + use opentelemetry_proto::tonic::collector::trace::v1::trace_service_client::TraceServiceClient; + use tonic::metadata::{MetadataKey, MetadataValue}; + + let url = format!("http://{}", endpoint); + let mut client = match TraceServiceClient::connect(url.clone()).await { + Ok(client) => client, + Err(e) => { + eprintln!("✗ Failed to connect to {}: {}", url, e); + return ExitCode::FAILURE; + } + }; + + let mut request = tonic::Request::new(trace_req); + + // Add dummy test headers (gRPC metadata) if requested + if include_headers { + let headers = FakeOTLP::example_headers(); + for (key, value) in headers { + if let Ok(metadata_key) = + MetadataKey::::from_bytes(key.as_bytes()) + { + if let Ok(metadata_value) = + MetadataValue::::try_from(value.as_str()) + { + request.metadata_mut().insert(metadata_key, metadata_value); + } + } + } + } + + match client.export(request).await { + Ok(_) => { + println!("✓ Successfully sent traces to {}", url); + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("✗ Failed to send traces to {}: {}", url, e); + ExitCode::FAILURE + } + } +} + +async fn send_grpc_metrics( + metrics_req: opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest, + endpoint: &str, + include_headers: bool, +) -> ExitCode { + use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_client::MetricsServiceClient; + use tonic::metadata::{MetadataKey, MetadataValue}; + + let url = format!("http://{}", endpoint); + let mut client = match MetricsServiceClient::connect(url.clone()).await { + Ok(client) => client, + Err(e) => { + eprintln!("✗ Failed to connect to {}: {}", url, e); + return ExitCode::FAILURE; + } + }; + + let mut request = tonic::Request::new(metrics_req); + + // Add dummy test headers (gRPC metadata) if requested + if include_headers { + let headers = FakeOTLP::example_headers(); + for (key, value) in headers { + if let Ok(metadata_key) = + MetadataKey::::from_bytes(key.as_bytes()) + { + if let Ok(metadata_value) = + MetadataValue::::try_from(value.as_str()) + { + request.metadata_mut().insert(metadata_key, metadata_value); + } + } + } + } + + match client.export(request).await { + Ok(_) => { + println!("✓ Successfully sent metrics to {}", url); + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("✗ Failed to send metrics to {}: {}", url, e); + ExitCode::FAILURE + } + } +} + +async fn send_grpc_logs( + logs_req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest, + endpoint: &str, + include_headers: bool, +) -> ExitCode { + use opentelemetry_proto::tonic::collector::logs::v1::logs_service_client::LogsServiceClient; + use tonic::metadata::{MetadataKey, MetadataValue}; + + let url = format!("http://{}", endpoint); + let mut client = match LogsServiceClient::connect(url.clone()).await { + Ok(client) => client, + Err(e) => { + eprintln!("✗ Failed to connect to {}: {}", url, e); + return ExitCode::FAILURE; + } + }; + + let mut request = tonic::Request::new(logs_req); + + // Add dummy test headers (gRPC metadata) if requested + if include_headers { + let headers = FakeOTLP::example_headers(); + for (key, value) in headers { + if let Ok(metadata_key) = + MetadataKey::::from_bytes(key.as_bytes()) + { + if let Ok(metadata_value) = + MetadataValue::::try_from(value.as_str()) + { + request.metadata_mut().insert(metadata_key, metadata_value); + } + } + } + } + + match client.export(request).await { + Ok(_) => { + println!("✓ Successfully sent logs to {}", url); + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("✗ Failed to send logs to {}: {}", url, e); + ExitCode::FAILURE + } + } +} diff --git a/src/exporters/awsemf/mod.rs b/src/exporters/awsemf/mod.rs index 15b8d118..424bae47 100644 --- a/src/exporters/awsemf/mod.rs +++ b/src/exporters/awsemf/mod.rs @@ -293,6 +293,7 @@ mod tests { let metrics = FakeOTLP::metrics_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: metrics.resource_metrics, }]) .await @@ -325,6 +326,7 @@ mod tests { let metrics = FakeOTLP::metrics_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: metrics.resource_metrics, }]) .await @@ -395,6 +397,7 @@ mod tests { let metrics = FakeOTLP::metrics_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: metrics.resource_metrics, }]) .await @@ -457,6 +460,7 @@ mod tests { let metrics = FakeOTLP::metrics_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: metrics.resource_metrics, }]) .await @@ -528,6 +532,7 @@ mod tests { let metrics = FakeOTLP::metrics_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: metrics.resource_metrics, }]) .await @@ -584,6 +589,7 @@ mod tests { let metrics = FakeOTLP::metrics_service_request(); btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: metrics.resource_metrics, }]) .await @@ -675,6 +681,7 @@ mod tests { btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: all_resource_metrics, }]) .await diff --git a/src/exporters/blackhole.rs b/src/exporters/blackhole.rs index 35113cad..a3573626 100644 --- a/src/exporters/blackhole.rs +++ b/src/exporters/blackhole.rs @@ -66,6 +66,7 @@ mod tests { .into_iter() .map(|span| Message { metadata: None, + request_context: None, payload: vec![span], }) .collect(); @@ -106,6 +107,7 @@ mod tests { // Send a message with metadata let message = Message { metadata: Some(metadata), + request_context: None, payload: vec![FakeOTLP::trace_service_request().resource_spans[0].clone()], }; tr_tx.send(vec![message]).await.unwrap(); diff --git a/src/exporters/clickhouse/mod.rs b/src/exporters/clickhouse/mod.rs index 868460cd..56bb99a0 100644 --- a/src/exporters/clickhouse/mod.rs +++ b/src/exporters/clickhouse/mod.rs @@ -360,6 +360,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: traces.resource_spans, }]) .await @@ -393,6 +394,7 @@ mod tests { let logs = FakeOTLP::logs_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: logs.resource_logs, }]) .await @@ -427,6 +429,7 @@ mod tests { btx.send(vec![Message { payload: metrics.resource_metrics, metadata: None, + request_context: None, }]) .await .unwrap(); @@ -477,6 +480,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: traces.resource_spans, }]) .await @@ -555,6 +559,7 @@ mod tests { // Send traces with metadata btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: all_resource_spans, }]) .await diff --git a/src/exporters/clickhouse/transform_traces.rs b/src/exporters/clickhouse/transform_traces.rs index 88d8822f..ae1f32f2 100644 --- a/src/exporters/clickhouse/transform_traces.rs +++ b/src/exporters/clickhouse/transform_traces.rs @@ -237,6 +237,7 @@ mod tests { let (result, _metadata) = transformer.transform(vec![Message { payload: vec![resource_spans], metadata: None, + request_context: None, }]); // Verify the transformation succeeded @@ -328,6 +329,7 @@ mod tests { let (result, _metadata) = transformer.transform(vec![Message { payload: vec![resource_spans], metadata: None, + request_context: None, }]); // Verify the transformation succeeded @@ -418,6 +420,7 @@ mod tests { let (result, _metadata) = transformer.transform(vec![Message { payload: vec![resource_spans], metadata: None, + request_context: None, }]); assert!( @@ -527,6 +530,7 @@ mod tests { let (result, _metadata) = transformer.transform(vec![Message { payload: vec![resource_spans], metadata: None, + request_context: None, }]); assert!( diff --git a/src/exporters/datadog/mod.rs b/src/exporters/datadog/mod.rs index ce4a6236..0d789184 100644 --- a/src/exporters/datadog/mod.rs +++ b/src/exporters/datadog/mod.rs @@ -34,6 +34,7 @@ mod types; /// Type alias for Datadog payloads using the generic MessagePayload use crate::exporters::http::metadata_extractor::MessagePayload; use http_body_util::Full; + pub type DatadogPayload = MessagePayload>; type SvcType = TowerRetry< @@ -258,6 +259,7 @@ mod tests { metadata: Some(crate::topology::payload::MessageMetadata::kafka( kafka_metadata.clone(), )), + request_context: None, payload: traces.resource_spans, }; @@ -319,6 +321,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: traces.resource_spans, }]) .await @@ -350,6 +353,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: traces.resource_spans, }]) .await diff --git a/src/exporters/kafka/exporter.rs b/src/exporters/kafka/exporter.rs index 9d96cdbd..fec186c2 100644 --- a/src/exporters/kafka/exporter.rs +++ b/src/exporters/kafka/exporter.rs @@ -385,6 +385,7 @@ impl KafkaExportable for ResourceMetrics { result.push(vec![Message { metadata, + request_context: None, payload: vec![resource_metric], }]); } @@ -470,6 +471,7 @@ impl KafkaExportable for ResourceLogs { result.push(vec![Message { metadata, + request_context: None, payload: vec![resource_log], }]); } diff --git a/src/exporters/kafka/tests.rs b/src/exporters/kafka/tests.rs index 87951d2d..37ee7843 100644 --- a/src/exporters/kafka/tests.rs +++ b/src/exporters/kafka/tests.rs @@ -483,6 +483,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_spans, }], ); @@ -513,6 +514,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_spans.clone(), }], ); @@ -551,6 +553,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_logs, }], ); @@ -588,6 +591,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_logs, }], ); @@ -620,6 +624,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_logs, }], ); @@ -672,6 +677,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_logs, }], ); @@ -708,6 +714,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_logs.clone(), }], ); @@ -746,6 +753,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_metrics, }], ); @@ -783,6 +791,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_metrics, }], ); @@ -815,6 +824,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_metrics, }], ); @@ -867,6 +877,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_metrics, }], ); @@ -903,6 +914,7 @@ mod tests { &config, vec![Message { metadata: None, + request_context: None, payload: resource_metrics.clone(), }], ); @@ -1022,6 +1034,7 @@ mod tests { // Create a message with metadata let message = Message { metadata: Some(metadata), + request_context: None, payload: resource_metrics, }; diff --git a/src/exporters/otlp/mod.rs b/src/exporters/otlp/mod.rs index 7a44d781..95a9aa29 100644 --- a/src/exporters/otlp/mod.rs +++ b/src/exporters/otlp/mod.rs @@ -499,6 +499,7 @@ mod tests { let res = trace_btx .send(vec![topology::payload::Message { metadata: None, + request_context: None, payload: FakeOTLP::trace_service_request().resource_spans, }]) .await; @@ -578,6 +579,7 @@ mod tests { let res = metrics_btx .send(vec![topology::payload::Message { metadata: None, + request_context: None, payload: FakeOTLP::metrics_service_request().resource_metrics, }]) .await; @@ -658,6 +660,7 @@ mod tests { .send(vec![topology::payload::Message { payload: FakeOTLP::logs_service_request().resource_logs, metadata: None, + request_context: None, }]) .await; assert!(&res.is_ok()); @@ -729,6 +732,7 @@ mod tests { .send(vec![topology::payload::Message { payload: FakeOTLP::trace_service_request().resource_spans, metadata: None, + request_context: None, }]) .await; assert!(&res.is_ok()); @@ -1094,6 +1098,7 @@ mod tests { .send(vec![topology::payload::Message { payload: FakeOTLP::trace_service_request().resource_spans, metadata: None, + request_context: None, }]) .await; if let Err(e) = res { @@ -1173,6 +1178,7 @@ mod tests { .send(vec![topology::payload::Message { payload, metadata: None, + request_context: None, }]) .await; if let Err(e) = res { @@ -1206,6 +1212,7 @@ mod tests { .send(vec![topology::payload::Message { payload, metadata: None, + request_context: None, }]) .await; if let Err(e) = res { @@ -1235,6 +1242,7 @@ mod tests { .send(vec![topology::payload::Message { payload, metadata: None, + request_context: None, }]) .await; if let Err(e) = res { @@ -1313,6 +1321,7 @@ mod tests { trace_btx .send(vec![topology::payload::Message { metadata: Some(metadata), + request_context: None, payload: traces.resource_spans, }]) .await diff --git a/src/exporters/xray/mod.rs b/src/exporters/xray/mod.rs index ce3dff7e..2d2f50ed 100644 --- a/src/exporters/xray/mod.rs +++ b/src/exporters/xray/mod.rs @@ -209,6 +209,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: traces.resource_spans, }]) .await @@ -240,6 +241,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: None, + request_context: None, payload: traces.resource_spans, }]) .await @@ -292,6 +294,7 @@ mod tests { let traces = FakeOTLP::trace_service_request(); btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: traces.resource_spans, }]) .await @@ -344,6 +347,7 @@ mod tests { let traces = FakeOTLP::trace_service_request_with_spans(1, 51); btx.send(vec![Message { metadata: None, + request_context: None, payload: traces.resource_spans, }]) .await @@ -396,6 +400,7 @@ mod tests { btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: traces.resource_spans, }]) .await @@ -486,6 +491,7 @@ mod tests { btx.send(vec![Message { metadata: Some(metadata), + request_context: None, payload: traces.resource_spans, }]) .await diff --git a/src/init/agent.rs b/src/init/agent.rs index dc920952..6dbcda0e 100644 --- a/src/init/agent.rs +++ b/src/init/agent.rs @@ -882,6 +882,8 @@ impl Agent { .with_traces_output(traces_output.clone()) .with_metrics_output(metrics_output.clone()) .with_logs_output(logs_output.clone()) + .with_include_metadata(config.otlp_grpc_include_metadata) + .with_headers_to_include(config.otlp_grpc_headers_to_include.clone()) .build(); let grpc_listener = self.port_map.remove(&config.otlp_grpc_endpoint).unwrap(); @@ -902,6 +904,8 @@ impl Agent { .with_traces_path(config.otlp_receiver_traces_http_path.clone()) .with_metrics_path(config.otlp_receiver_metrics_http_path.clone()) .with_logs_path(config.otlp_receiver_logs_http_path.clone()) + .with_include_metadata(config.otlp_http_include_metadata) + .with_headers_to_include(config.otlp_http_headers_to_include.clone()) .build(); let http_listener = self.port_map.remove(&config.otlp_http_endpoint).unwrap(); diff --git a/src/init/otlp_receiver.rs b/src/init/otlp_receiver.rs index 75fec018..af8f0a36 100644 --- a/src/init/otlp_receiver.rs +++ b/src/init/otlp_receiver.rs @@ -66,6 +66,40 @@ pub struct OTLPReceiverArgs { default_value = "/v1/logs" )] pub otlp_receiver_logs_http_path: String, + + /// Enable including HTTP request headers in message metadata (context). + /// When enabled, specified headers are stored in context and can be accessed by processors. + /// This follows the OTel Collector pattern where processors pull from context to add attributes. + /// Example: set to true to enable metadata extraction + #[arg( + long, + env = "ROTEL_OTLP_HTTP_INCLUDE_METADATA", + default_value = "false" + )] + pub otlp_http_include_metadata: bool, + + /// Comma-separated list of HTTP headers to include in metadata when include_metadata is enabled. + /// Headers are stored in context and can be accessed by processors using from_context. + /// Example: "my-custom-header,another-header" + #[arg(long, env = "ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE", default_value = "")] + pub otlp_http_headers_to_include: String, + + /// Enable including gRPC request metadata in message metadata (context). + /// When enabled, specified metadata keys are stored in context and can be accessed by processors. + /// This follows the OTel Collector pattern where processors pull from context to add attributes. + /// Example: set to true to enable metadata extraction + #[arg( + long, + env = "ROTEL_OTLP_GRPC_INCLUDE_METADATA", + default_value = "false" + )] + pub otlp_grpc_include_metadata: bool, + + /// Comma-separated list of gRPC headers to include in metadata when include_metadata is enabled. + /// Headers are stored in context and can be accessed by processors using from_context. + /// Example: "my-custom-header,another-header" + #[arg(long, env = "ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE", default_value = "")] + pub otlp_grpc_headers_to_include: String, } impl Default for OTLPReceiverArgs { @@ -80,6 +114,10 @@ impl Default for OTLPReceiverArgs { otlp_receiver_traces_http_path: "/v1/traces".to_string(), otlp_receiver_metrics_http_path: "/v1/metrics".to_string(), otlp_receiver_logs_http_path: "/v1/logs".to_string(), + otlp_http_include_metadata: false, + otlp_http_headers_to_include: String::new(), + otlp_grpc_include_metadata: false, + otlp_grpc_headers_to_include: String::new(), } } } @@ -96,6 +134,20 @@ impl From<&OTLPReceiverArgs> for OTLPReceiverConfig { otlp_receiver_traces_http_path: value.otlp_receiver_traces_http_path.to_owned(), otlp_receiver_metrics_http_path: value.otlp_receiver_metrics_http_path.to_owned(), otlp_receiver_logs_http_path: value.otlp_receiver_logs_http_path.to_owned(), + otlp_http_include_metadata: value.otlp_http_include_metadata, + otlp_http_headers_to_include: value + .otlp_http_headers_to_include + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(), + otlp_grpc_include_metadata: value.otlp_grpc_include_metadata, + otlp_grpc_headers_to_include: value + .otlp_grpc_headers_to_include + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(), } } } diff --git a/src/receivers/kafka/receiver.rs b/src/receivers/kafka/receiver.rs index 669d6966..618fab4e 100644 --- a/src/receivers/kafka/receiver.rs +++ b/src/receivers/kafka/receiver.rs @@ -525,7 +525,7 @@ impl KafkaReceiver { true => None, false => Some(MessageMetadata::kafka(metadata)), }; - let message = payload::Message::new(md, resources); + let message = payload::Message::new(md, resources, None); if let Err(KafkaReceiverError::SendCancelled) = Self::send_with_cancellation(output, message, &receivers_cancel, "traces").await { break; } @@ -537,7 +537,7 @@ impl KafkaReceiver { true => None, false => Some(MessageMetadata::kafka(metadata)), }; - let message = payload::Message::new(md, resources); + let message = payload::Message::new(md, resources, None); if let Err(KafkaReceiverError::SendCancelled) = Self::send_with_cancellation(output, message, &receivers_cancel, "metrics").await { break; } @@ -549,7 +549,7 @@ impl KafkaReceiver { true => None, false => Some(MessageMetadata::kafka(metadata)), }; - let message = payload::Message::new(md, resources); + let message = payload::Message::new(md, resources, None); if let Err(KafkaReceiverError::SendCancelled) = Self::send_with_cancellation(output, message, &receivers_cancel, "logs").await { break; } @@ -1003,6 +1003,7 @@ mod tests { let message = payload::Message { metadata: None, payload: resources, + request_context: None, }; output.send(message).await.expect("Failed to send"); } @@ -1073,6 +1074,7 @@ mod tests { let message = payload::Message { metadata: None, payload: resources, + request_context: None, }; output.send(message).await.expect("Failed to send"); } @@ -1558,7 +1560,7 @@ mod tests { let output = OTLPOutput::new(tx); // Fill the channel to make it block by sending a message but not receiving it - let blocking_message = payload::Message::new(None, vec![ResourceSpans::default()]); + let blocking_message = payload::Message::new(None, vec![ResourceSpans::default()], None); output .send(blocking_message) .await @@ -1567,7 +1569,7 @@ mod tests { // Now the channel is full (size 1) - any new send will block // Create a message to send - let message = payload::Message::new(None, vec![ResourceSpans::default()]); + let message = payload::Message::new(None, vec![ResourceSpans::default()], None); // Create cancellation token let cancel_token = CancellationToken::new(); diff --git a/src/receivers/otlp/mod.rs b/src/receivers/otlp/mod.rs index cb7cb51c..2244faf1 100644 --- a/src/receivers/otlp/mod.rs +++ b/src/receivers/otlp/mod.rs @@ -14,4 +14,8 @@ pub struct OTLPReceiverConfig { pub otlp_receiver_traces_http_path: String, pub otlp_receiver_metrics_http_path: String, pub otlp_receiver_logs_http_path: String, + pub otlp_http_include_metadata: bool, + pub otlp_http_headers_to_include: Vec, + pub otlp_grpc_include_metadata: bool, + pub otlp_grpc_headers_to_include: Vec, } diff --git a/src/receivers/otlp/otlp_grpc.rs b/src/receivers/otlp/otlp_grpc.rs index 95a0fb00..22692204 100644 --- a/src/receivers/otlp/otlp_grpc.rs +++ b/src/receivers/otlp/otlp_grpc.rs @@ -4,7 +4,7 @@ use crate::listener::Listener; use crate::receivers::get_meter; use crate::receivers::otlp_output::OTLPOutput; use crate::topology::batch::BatchSizer; -use crate::topology::payload::Message; +use crate::topology::payload::{Message, RequestContext}; use opentelemetry::KeyValue; use opentelemetry::metrics::Counter; use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::{ @@ -26,6 +26,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ use opentelemetry_proto::tonic::logs::v1::ResourceLogs; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; use opentelemetry_proto::tonic::trace::v1::ResourceSpans; +use std::collections::HashMap; use std::default::Default; use std::error::Error; use tokio_util::sync::CancellationToken; @@ -39,6 +40,8 @@ pub struct OTLPGrpcServerBuilder { traces_output: Option>>, metrics_output: Option>>, logs_output: Option>>, + include_metadata: bool, + headers_to_include: Vec, } impl OTLPGrpcServerBuilder {} @@ -70,12 +73,24 @@ impl OTLPGrpcServerBuilder { self } + pub fn with_include_metadata(mut self, include: bool) -> Self { + self.include_metadata = include; + self + } + + pub fn with_headers_to_include(mut self, headers: Vec) -> Self { + self.headers_to_include = headers; + self + } + pub fn build(self) -> OTLPGrpcServer { OTLPGrpcServer { traces_output: self.traces_output, metrics_output: self.metrics_output, logs_output: self.logs_output, max_recv_msg_size_mib: self.max_recv_msg_size_mib, + include_metadata: self.include_metadata, + headers_to_include: self.headers_to_include, } } } @@ -85,6 +100,8 @@ pub struct OTLPGrpcServer { metrics_output: Option>>, logs_output: Option>>, max_recv_msg_size_mib: Option, + include_metadata: bool, + headers_to_include: Vec, } impl OTLPGrpcServer { @@ -101,6 +118,8 @@ impl OTLPGrpcServer { self.traces_output.clone(), self.metrics_output.clone(), self.logs_output.clone(), + self.include_metadata, + self.headers_to_include.clone(), ); let stream = listener.into_stream()?; @@ -174,6 +193,8 @@ struct CollectorService { refused_metric_points_counter: Counter, refused_log_records_counter: Counter, tags: [KeyValue; 1], + include_metadata: bool, + headers_to_include: Vec, } impl CollectorService { @@ -181,6 +202,8 @@ impl CollectorService { traces_tx: Option>>, metrics_tx: Option>>, logs_tx: Option>>, + include_metadata: bool, + headers_to_include: Vec, ) -> Self { Self { traces_tx, @@ -225,6 +248,29 @@ impl CollectorService { .with_unit("log_records") .build(), tags: [KeyValue::new("protocol", "grpc")], + include_metadata, + headers_to_include, + } + } + + fn extract_context_from_request(&self, request: &Request) -> Option { + if !self.include_metadata || self.headers_to_include.is_empty() { + return None; + } + let mut metadata_map = HashMap::new(); + let request_metadata = request.metadata(); + for key in &self.headers_to_include { + let normalized = key.to_lowercase(); + if let Some(value) = request_metadata.get(&normalized) { + if let Ok(value_str) = value.to_str() { + metadata_map.insert(normalized, value_str.to_string()); + } + } + } + if !metadata_map.is_empty() { + Some(RequestContext::Grpc(metadata_map)) + } else { + None } } } @@ -235,6 +281,7 @@ impl TraceService for CollectorService { &self, request: Request, ) -> Result, Status> { + let req_context = self.extract_context_from_request(&request); let trace_request = request.into_inner(); match &self.traces_tx { None => Err(Status::unavailable("OTLP trace receiver is disabled")), @@ -244,6 +291,7 @@ impl TraceService for CollectorService { .send(Message { metadata: None, payload: trace_request.resource_spans, + request_context: req_context, }) .await { @@ -269,6 +317,7 @@ impl MetricsService for CollectorService { &self, request: Request, ) -> Result, Status> { + let req_context = self.extract_context_from_request(&request); let metrics_request = request.into_inner(); match &self.metrics_tx { None => Err(Status::unavailable("OTLP metrics receiver is disabled")), @@ -277,6 +326,7 @@ impl MetricsService for CollectorService { match metrics_tx .send(Message { metadata: None, + request_context: req_context, payload: metrics_request.resource_metrics, }) .await @@ -303,6 +353,7 @@ impl LogsService for CollectorService { &self, request: Request, ) -> Result, Status> { + let req_context = self.extract_context_from_request(&request); let logs_request = request.into_inner(); match &self.logs_tx { None => Err(Status::unavailable("OTLP logs receiver is disabled")), @@ -312,6 +363,7 @@ impl LogsService for CollectorService { .send(Message { metadata: None, payload: logs_request.resource_logs, + request_context: req_context, }) .await { @@ -337,7 +389,7 @@ mod tests { use crate::listener::Listener; use crate::receivers::otlp::otlp_grpc::OTLPGrpcServer; use crate::receivers::otlp_output::OTLPOutput; - use crate::topology::payload::Message; + use crate::topology::payload::{Message, RequestContext}; use opentelemetry_proto::tonic::collector::logs::v1::logs_service_client::LogsServiceClient; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -356,6 +408,7 @@ mod tests { use std::net::SocketAddr; use tokio_test::{assert_err, assert_ok}; use tokio_util::sync::CancellationToken; + use tonic::metadata::{MetadataKey, MetadataValue}; use tonic::{Response, Status}; use utilities::otlp::FakeOTLP; @@ -568,4 +621,393 @@ mod tests { client.export(msg).await } + + async fn send_trace_msg_with_metadata( + addr: SocketAddr, + msg: ExportTraceServiceRequest, + metadata: Vec<(&str, &str)>, + ) -> Result, Status> { + let addr = format!("http://{}", addr); + let mut client = TraceServiceClient::connect(addr).await.unwrap(); + + let mut request = tonic::Request::new(msg); + for (key, value) in metadata { + // Use from_bytes for non-static strings + if let Ok(metadata_key) = + MetadataKey::::from_bytes(key.as_bytes()) + { + if let Ok(metadata_value) = MetadataValue::::try_from(value) + { + request.metadata_mut().insert(metadata_key, metadata_value); + } + } + } + + client.export(request).await + } + + async fn send_metrics_msg_with_metadata( + addr: SocketAddr, + msg: ExportMetricsServiceRequest, + metadata: Vec<(&str, &str)>, + ) -> Result, Status> { + let addr = format!("http://{}", addr); + let mut client = MetricsServiceClient::connect(addr).await.unwrap(); + + let mut request = tonic::Request::new(msg); + for (key, value) in metadata { + // Use from_bytes for non-static strings + if let Ok(metadata_key) = + MetadataKey::::from_bytes(key.as_bytes()) + { + if let Ok(metadata_value) = MetadataValue::::try_from(value) + { + request.metadata_mut().insert(metadata_key, metadata_value); + } + } + } + + client.export(request).await + } + + async fn send_logs_msg_with_metadata( + addr: SocketAddr, + msg: ExportLogsServiceRequest, + metadata: Vec<(&str, &str)>, + ) -> Result, Status> { + let addr = format!("http://{}", addr); + let mut client = LogsServiceClient::connect(addr).await.unwrap(); + + let mut request = tonic::Request::new(msg); + for (key, value) in metadata { + // Use from_bytes for non-static strings + if let Ok(metadata_key) = + MetadataKey::::from_bytes(key.as_bytes()) + { + if let Ok(metadata_value) = MetadataValue::::try_from(value) + { + request.metadata_mut().insert(metadata_key, metadata_value); + } + } + } + + client.export(request).await + } + + #[tokio::test] + async fn grpc_metadata_extracted_for_traces() { + let example_headers = FakeOTLP::example_headers(); + let header_names: Vec = example_headers.keys().cloned().collect(); + + let (trace_in_tx, mut trace_rx) = bounded::>(10); + let trace_output = OTLPOutput::new(trace_in_tx); + let cancel = CancellationToken::new(); + + let listener = Listener::listen_async("[::1]:0".parse().unwrap()) + .await + .unwrap(); + + let srv = OTLPGrpcServer::builder() + .with_traces_output(Some(trace_output)) + .with_include_metadata(true) + .with_headers_to_include(header_names.clone()) + .build(); + let addr = listener.bound_address().unwrap(); + let cancel_token = cancel.clone(); + let srv_fut = async move { srv.serve(listener, cancel_token).await }; + tokio::pin!(srv_fut); + + let req = FakeOTLP::trace_service_request_with_spans(1, 1); + // Convert example headers to Vec of (&str, &str) for the helper function + let metadata_vec: Vec<(&str, &str)> = example_headers + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + let send_fut = send_trace_msg_with_metadata(addr, req, metadata_vec); + + tokio::select! { + _ = &mut srv_fut => {}, + msg = send_fut => { + assert_ok!(msg); + cancel.cancel(); + } + } + srv_fut.await.unwrap(); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is present + assert!(msg.request_context.is_some()); + let request_context = msg.request_context.as_ref().unwrap(); + + // Verify all example headers are present + for (key, expected_value) in &example_headers { + match request_context { + RequestContext::Http(h) => { + panic!("expected grpc request headers, got http {:?}", h) + } + RequestContext::Grpc(h) => { + assert_eq!( + h.get(key), + Some(expected_value), + "Metadata key {} should be present with value {}", + key, + expected_value + ); + } + } + } + } + + #[tokio::test] + async fn grpc_metadata_not_extracted_when_disabled() { + let example_headers = FakeOTLP::example_headers(); + let header_names: Vec = example_headers.keys().cloned().collect(); + let first_header = header_names.first().unwrap(); + + let (trace_in_tx, mut trace_rx) = bounded::>(10); + let trace_output = OTLPOutput::new(trace_in_tx); + let cancel = CancellationToken::new(); + + let listener = Listener::listen_async("[::1]:0".parse().unwrap()) + .await + .unwrap(); + + let srv = OTLPGrpcServer::builder() + .with_traces_output(Some(trace_output)) + .with_include_metadata(false) + .with_headers_to_include(vec![first_header.clone()]) + .build(); + let addr = listener.bound_address().unwrap(); + let cancel_token = cancel.clone(); + let srv_fut = async move { srv.serve(listener, cancel_token).await }; + tokio::pin!(srv_fut); + + let req = FakeOTLP::trace_service_request_with_spans(1, 1); + let send_fut = send_trace_msg_with_metadata( + addr, + req, + vec![( + first_header.as_str(), + example_headers.get(first_header).unwrap().as_str(), + )], + ); + + tokio::select! { + _ = &mut srv_fut => {}, + msg = send_fut => { + assert_ok!(msg); + cancel.cancel(); + } + } + srv_fut.await.unwrap(); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is NOT present when disabled + assert!(msg.metadata.is_none()); + } + + #[tokio::test] + async fn grpc_metadata_not_extracted_when_no_keys_specified() { + let example_headers = FakeOTLP::example_headers(); + let first_header = example_headers.keys().next().unwrap(); + + let (trace_in_tx, mut trace_rx) = bounded::>(10); + let trace_output = OTLPOutput::new(trace_in_tx); + let cancel = CancellationToken::new(); + + let listener = Listener::listen_async("[::1]:0".parse().unwrap()) + .await + .unwrap(); + + let srv = OTLPGrpcServer::builder() + .with_traces_output(Some(trace_output)) + .with_include_metadata(true) + .with_headers_to_include(vec![]) + .build(); + let addr = listener.bound_address().unwrap(); + let cancel_token = cancel.clone(); + let srv_fut = async move { srv.serve(listener, cancel_token).await }; + tokio::pin!(srv_fut); + + let req = FakeOTLP::trace_service_request_with_spans(1, 1); + let send_fut = send_trace_msg_with_metadata( + addr, + req, + vec![( + first_header.as_str(), + example_headers.get(first_header).unwrap().as_str(), + )], + ); + + tokio::select! { + _ = &mut srv_fut => {}, + msg = send_fut => { + assert_ok!(msg); + cancel.cancel(); + } + } + srv_fut.await.unwrap(); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is NOT present when no keys specified + assert!(msg.metadata.is_none()); + } + + #[tokio::test] + async fn grpc_metadata_extracted_for_metrics() { + let (metrics_in_tx, mut metrics_rx) = bounded::>(10); + let metrics_output = OTLPOutput::new(metrics_in_tx); + let cancel = CancellationToken::new(); + + let listener = Listener::listen_async("[::1]:0".parse().unwrap()) + .await + .unwrap(); + + let srv = OTLPGrpcServer::builder() + .with_metrics_output(Some(metrics_output)) + .with_include_metadata(true) + .with_headers_to_include(vec!["my-custom-header".to_string()]) + .build(); + let addr = listener.bound_address().unwrap(); + let cancel_token = cancel.clone(); + let srv_fut = async move { srv.serve(listener, cancel_token).await }; + tokio::pin!(srv_fut); + + let req = FakeOTLP::metrics_service_request_with_metrics(1, 1); + let send_fut = + send_metrics_msg_with_metadata(addr, req, vec![("my-custom-header", "metrics-value")]); + + tokio::select! { + _ = &mut srv_fut => {}, + msg = send_fut => { + assert_ok!(msg); + cancel.cancel(); + } + } + srv_fut.await.unwrap(); + + let msg = metrics_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is present + assert!(msg.request_context.is_some()); + let req_context = msg.request_context.as_ref().unwrap(); + match req_context { + RequestContext::Http(h) => { + panic!("expected grpc request headers, got http {:?}", h) + } + RequestContext::Grpc(h) => { + assert_eq!( + h.get("my-custom-header"), + Some(&"metrics-value".to_string()) + ); + } + } + } + + #[tokio::test] + async fn grpc_metadata_extracted_for_logs() { + let (logs_in_tx, mut logs_rx) = bounded::>(10); + let logs_output = OTLPOutput::new(logs_in_tx); + let cancel = CancellationToken::new(); + + let listener = Listener::listen_async("[::1]:0".parse().unwrap()) + .await + .unwrap(); + + let srv = OTLPGrpcServer::builder() + .with_logs_output(Some(logs_output)) + .with_include_metadata(true) + .with_headers_to_include(vec!["my-custom-header".to_string()]) + .build(); + let addr = listener.bound_address().unwrap(); + let cancel_token = cancel.clone(); + let srv_fut = async move { srv.serve(listener, cancel_token).await }; + tokio::pin!(srv_fut); + + let req = FakeOTLP::logs_service_request_with_logs(1, 1); + let send_fut = + send_logs_msg_with_metadata(addr, req, vec![("my-custom-header", "logs-value")]); + + tokio::select! { + _ = &mut srv_fut => {}, + msg = send_fut => { + assert_ok!(msg); + cancel.cancel(); + } + } + srv_fut.await.unwrap(); + + let msg = logs_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is present + assert!(msg.request_context.is_some()); + let req_context = msg.request_context.as_ref().unwrap(); + match req_context { + RequestContext::Http(h) => { + panic!("expected grpc request headers, got http {:?}", h) + } + RequestContext::Grpc(h) => { + assert_eq!(h.get("my-custom-header"), Some(&"logs-value".to_string())); + } + } + } + + #[tokio::test] + async fn grpc_metadata_key_case_insensitive() { + let (trace_in_tx, mut trace_rx) = bounded::>(10); + let trace_output = OTLPOutput::new(trace_in_tx); + let cancel = CancellationToken::new(); + + let listener = Listener::listen_async("[::1]:0".parse().unwrap()) + .await + .unwrap(); + + let srv = OTLPGrpcServer::builder() + .with_traces_output(Some(trace_output)) + .with_include_metadata(true) + .with_headers_to_include(vec!["My-Custom-Header".to_string()]) + .build(); + let addr = listener.bound_address().unwrap(); + let cancel_token = cancel.clone(); + let srv_fut = async move { srv.serve(listener, cancel_token).await }; + tokio::pin!(srv_fut); + + let req = FakeOTLP::trace_service_request_with_spans(1, 1); + // Send metadata with lowercase key + let send_fut = + send_trace_msg_with_metadata(addr, req, vec![("my-custom-header", "test-value")]); + + tokio::select! { + _ = &mut srv_fut => {}, + msg = send_fut => { + assert_ok!(msg); + cancel.cancel(); + } + } + srv_fut.await.unwrap(); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is present and key is normalized to lowercase + assert!(msg.request_context.is_some()); + let req_context = msg.request_context.as_ref().unwrap(); + match req_context { + RequestContext::Http(h) => { + panic!("expected grpc request headers, got http {:?}", h) + } + RequestContext::Grpc(h) => { + // Should be stored as lowercase + assert_eq!(h.get("my-custom-header"), Some(&"test-value".to_string())); + } + } + } } diff --git a/src/receivers/otlp/otlp_http.rs b/src/receivers/otlp/otlp_http.rs index 8ddeb584..78110627 100644 --- a/src/receivers/otlp/otlp_http.rs +++ b/src/receivers/otlp/otlp_http.rs @@ -11,6 +11,12 @@ use hyper::{Request, Response, StatusCode}; use hyper_util::rt::{TokioExecutor, TokioIo}; use hyper_util::server::conn::auto::Builder; use hyper_util::service::TowerToHyperService; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, +}; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, +}; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; @@ -26,20 +32,16 @@ use tracing::{debug, error}; use crate::listener::Listener; use crate::receivers::get_meter; use crate::topology::batch::BatchSizer; -use crate::topology::payload::{Message, OTLPInto}; +use crate::topology::payload::{Message, OTLPInto, RequestContext}; use opentelemetry::KeyValue; use opentelemetry::metrics::Counter; -use opentelemetry_proto::tonic::collector::logs::v1::{ - ExportLogsServiceRequest, ExportLogsServiceResponse, -}; -use opentelemetry_proto::tonic::collector::metrics::v1::{ - ExportMetricsServiceRequest, ExportMetricsServiceResponse, -}; use opentelemetry_proto::tonic::logs::v1::ResourceLogs; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; use opentelemetry_proto::tonic::trace::v1::ResourceSpans; use serde::Serialize; use serde::de::DeserializeOwned; +use serde_json; +use std::collections::HashMap; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -69,6 +71,8 @@ pub struct OTLPHttpServerBuilder { metrics_path: String, logs_path: String, header_timeout: Option, + include_metadata: bool, + headers_to_include: Vec, } impl OTLPHttpServerBuilder { @@ -112,6 +116,16 @@ impl OTLPHttpServerBuilder { } } + pub fn with_include_metadata(mut self, include: bool) -> Self { + self.include_metadata = include; + self + } + + pub fn with_headers_to_include(mut self, headers: Vec) -> Self { + self.headers_to_include = headers; + self + } + pub fn build(self) -> OTLPHttpServer { OTLPHttpServer { trace_output: self.trace_output, @@ -121,6 +135,8 @@ impl OTLPHttpServerBuilder { traces_path: self.traces_path, metrics_path: self.metrics_path, logs_path: self.logs_path, + include_metadata: self.include_metadata, + headers_to_include: self.headers_to_include, } } } @@ -133,6 +149,8 @@ pub struct OTLPHttpServer { pub metrics_path: String, pub logs_path: String, header_timeout: Duration, + include_metadata: bool, + headers_to_include: Vec, } impl OTLPHttpServer { @@ -152,6 +170,8 @@ impl OTLPHttpServer { self.traces_path.clone(), self.metrics_path.clone(), self.logs_path.clone(), + self.include_metadata, + self.headers_to_include.clone(), ); // To bridge Tower->Hyper we must wrap the tower service @@ -251,6 +271,8 @@ fn build_service( traces_path: String, metrics_path: String, logs_path: String, + include_metadata: bool, + headers_to_include: Vec, ) -> Trace< RequestBodyLimit>>, HttpMakeClassifier, @@ -276,6 +298,8 @@ fn build_service( traces_path, metrics_path, logs_path, + include_metadata, + headers_to_include, )) } @@ -287,6 +311,8 @@ struct OTLPService { traces_path: String, metrics_path: String, logs_path: String, + include_metadata: bool, + headers_to_include: Vec, accepted_spans_records_counter: Counter, accepted_metric_points_counter: Counter, accepted_log_records_counter: Counter, @@ -304,6 +330,8 @@ impl OTLPService { traces_path: String, metrics_path: String, logs_path: String, + include_metadata: bool, + headers_to_include: Vec, ) -> Self { // Compute this once @@ -314,6 +342,8 @@ impl OTLPService { traces_path, metrics_path, logs_path, + include_metadata, + headers_to_include, accepted_spans_records_counter: get_meter() .u64_counter("rotel_receiver_accepted_spans") .with_description( @@ -375,6 +405,10 @@ where match req.method() { &Method::POST => { let path = req.uri().path(); + let include_metadata = self.include_metadata; + let headers_to_include = self.headers_to_include.clone(); + let tags = self.tags.clone(); + if path == self.traces_path { if self.trace_output.is_none() { return Box::pin(futures::future::ok( @@ -384,13 +418,20 @@ where let output = self.trace_output.clone().unwrap(); let accepted = self.accepted_spans_records_counter.clone(); let refused = self.refused_spans_records_counter.clone(); - let tags = self.tags.clone(); return Box::pin(handle::< H, ExportTraceServiceRequest, ExportTraceServiceResponse, ResourceSpans, - >(req, output, accepted, refused, tags)); + >( + req, + output, + accepted, + refused, + tags, + include_metadata, + headers_to_include, + )); } if path == self.metrics_path { if self.metrics_output.is_none() { @@ -401,13 +442,20 @@ where let output = self.metrics_output.clone().unwrap(); let accepted = self.accepted_metric_points_counter.clone(); let refused = self.refused_metric_points_counter.clone(); - let tags = self.tags.clone(); return Box::pin(handle::< H, ExportMetricsServiceRequest, ExportMetricsServiceResponse, ResourceMetrics, - >(req, output, accepted, refused, tags)); + >( + req, + output, + accepted, + refused, + tags, + include_metadata, + headers_to_include, + )); } if path == self.logs_path { if self.logs_output.is_none() { @@ -418,13 +466,20 @@ where let output = self.logs_output.clone().unwrap(); let accepted = self.accepted_log_records_counter.clone(); let refused = self.refused_log_records_counter.clone(); - let tags = self.tags.clone(); return Box::pin(handle::< H, ExportLogsServiceRequest, ExportLogsServiceResponse, ResourceLogs, - >(req, output, accepted, refused, tags)); + >( + req, + output, + accepted, + refused, + tags, + include_metadata, + headers_to_include, + )); } Box::pin(futures::future::ok( response_4xx(StatusCode::NOT_FOUND).unwrap(), @@ -481,23 +536,63 @@ where Ok(decoded_bytes) } +/// Extract specified HTTP headers and store them in a HashMap for metadata. +/// Headers are normalized to lowercase for consistent lookup. +fn extract_headers_to_request_context( + req: &Request, + headers_to_include: &[String], +) -> HashMap { + let mut headers_map = HashMap::new(); + let request_headers = req.headers(); + + for header_name in headers_to_include { + // Normalize header name: lowercase for consistency + let normalized = header_name.to_lowercase(); + + if let Ok(header_name_parsed) = normalized.parse::() { + if let Some(header_value) = request_headers.get(&header_name_parsed) { + if let Ok(value_str) = header_value.to_str() { + let header_value_str = value_str.to_string(); + headers_map.insert(normalized.clone(), header_value_str); + } + } + } + } + + headers_map +} + async fn handle< H: Body, ExpReq: prost::Message + DeserializeOwned + Default + OTLPInto>, ExpResp: prost::Message + Serialize + Default, - T: prost::Message, + T: prost::Message + crate::topology::batch::BatchSizer, >( req: Request, output: OTLPOutput>, accepted_counter: Counter, refused_counter: Counter, tags: [KeyValue; 1], + include_metadata: bool, + headers_to_include: Vec, ) -> Result>, hyper::Error> where ::Error: Display + Debug + Send + Sync + ToString, - [T]: BatchSizer, + [T]: crate::topology::batch::BatchSizer, { - // We've already validated this header exists + // Extract headers before consuming the request body + let http_request_ctx = if include_metadata && !headers_to_include.is_empty() { + let headers_map = extract_headers_to_request_context(&req, &headers_to_include); + + if !headers_map.is_empty() { + Some(RequestContext::Http(headers_map)) + } else { + None + } + } else { + None + }; + let ct = req.headers().get(CONTENT_TYPE).unwrap().clone(); let decoded_bytes = decode_body(req).await; @@ -506,7 +601,7 @@ where } let mut json_resp = false; - let req = match ct.to_str().unwrap() { + let otlp_req = match ct.to_str().unwrap() { PROTOBUF_CT => { let decoded = ExpReq::decode(decoded_bytes.unwrap()); if let Err(e) = decoded { @@ -544,12 +639,13 @@ where resp_headers.insert(CONTENT_TYPE, HeaderValue::from_str(PROTOBUF_CT).unwrap()); } - let otlp_payload = ExpReq::otlp_into(req); + let otlp_payload = otlp_req.otlp_into(); let count = BatchSizer::size_of(otlp_payload.as_slice()); match output .send(Message { metadata: None, + request_context: http_request_ctx, payload: otlp_payload, }) .await @@ -634,6 +730,7 @@ mod tests { MAX_BODY_SIZE, OTLPHttpServer, OTLPService, ValidateOTLPContentType, build_service, }; use crate::receivers::otlp_output::OTLPOutput; + use crate::topology::payload::RequestContext; use hyper_util::service::TowerToHyperService; use opentelemetry_proto::tonic::logs::v1::ResourceLogs; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; @@ -981,6 +1078,8 @@ mod tests { "/v1/traces".to_string(), "/v1/metrics".to_string(), "/v1/logs".to_string(), + false, + vec![], ); let svc = TowerToHyperService::new(svc); @@ -994,4 +1093,272 @@ mod tests { .timer(TokioTimer::new()) .build::<_, Full>(HttpConnector::new()) } + + fn new_svc_with_metadata( + include_metadata: bool, + headers_to_include: Vec, + ) -> ( + TowerToHyperService< + Trace< + RequestBodyLimit< + Compression>, + >, + HttpMakeClassifier, + >, + >, + BoundedReceiver>, + BoundedReceiver>, + BoundedReceiver>, + ) { + let (trace_tx, trace_rx) = bounded::>(10); + let (metrics_tx, metrics_rx) = + bounded::>(10); + let (logs_tx, logs_rx) = bounded::>(10); + let trace_output = OTLPOutput::new(trace_tx); + let metrics_output = OTLPOutput::new(metrics_tx); + let logs_output = OTLPOutput::new(logs_tx); + + let svc = build_service( + Some(trace_output), + Some(metrics_output), + Some(logs_output), + "/v1/traces".to_string(), + "/v1/metrics".to_string(), + "/v1/logs".to_string(), + include_metadata, + headers_to_include, + ); + let svc = TowerToHyperService::new(svc); + + (svc, trace_rx, metrics_rx, logs_rx) + } + + #[tokio::test] + async fn http_metadata_extracted_for_traces() { + let example_headers = FakeOTLP::example_headers(); + let header_names: Vec = example_headers.keys().cloned().collect(); + + let (svc, mut trace_rx, _, _) = new_svc_with_metadata(true, header_names.clone()); + + let trace_req = FakeOTLP::trace_service_request(); + let mut buf = Vec::with_capacity(trace_req.encoded_len()); + assert_ok!(trace_req.encode(&mut buf)); + let buf = Bytes::from(buf); + + let mut req_builder = Request::builder() + .uri("/v1/traces") + .method(Method::POST) + .header(CONTENT_TYPE, "application/x-protobuf"); + + // Add example headers from FakeOTLP + for (key, value) in &example_headers { + req_builder = req_builder.header(key, value.as_str()); + } + + let req: Request> = req_builder.body(Full::new(buf)).unwrap(); + + let resp = svc.call(req).await.unwrap(); + assert_eq!(StatusCode::OK, resp.status()); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify metadata is present + assert!(msg.request_context.is_some()); + let request_context = msg.request_context.as_ref().unwrap(); + match request_context { + RequestContext::Http(h) => { + // Verify all example headers are present + for (key, expected_value) in &example_headers { + assert_eq!( + h.get(key), + Some(expected_value), + "Header {} should be present with value {}", + key, + expected_value + ); + } + } + RequestContext::Grpc(h) => { + panic!("expecting a Http header: got grpc headers {:?}", h); + } + } + } + + #[tokio::test] + async fn http_metadata_not_extracted_when_disabled() { + let example_headers = FakeOTLP::example_headers(); + let header_names: Vec = example_headers.keys().cloned().collect(); + let first_header = header_names.first().unwrap(); + + let (svc, mut trace_rx, _, _) = new_svc_with_metadata(false, vec![first_header.clone()]); + + let trace_req = FakeOTLP::trace_service_request(); + let mut buf = Vec::with_capacity(trace_req.encoded_len()); + assert_ok!(trace_req.encode(&mut buf)); + let buf = Bytes::from(buf); + + let req: Request> = Request::builder() + .uri("/v1/traces") + .method(Method::POST) + .header(CONTENT_TYPE, "application/x-protobuf") + .header(first_header, example_headers.get(first_header).unwrap()) + .body(Full::new(buf)) + .unwrap(); + + let resp = svc.call(req).await.unwrap(); + assert_eq!(StatusCode::OK, resp.status()); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify request_context is NOT present when disabled + assert!(msg.request_context.is_none()); + } + + #[tokio::test] + async fn http_metadata_not_extracted_when_no_headers_specified() { + let example_headers = FakeOTLP::example_headers(); + let first_header = example_headers.keys().next().unwrap(); + + let (svc, mut trace_rx, _, _) = new_svc_with_metadata(true, vec![]); + + let trace_req = FakeOTLP::trace_service_request(); + let mut buf = Vec::with_capacity(trace_req.encoded_len()); + assert_ok!(trace_req.encode(&mut buf)); + let buf = Bytes::from(buf); + + let req: Request> = Request::builder() + .uri("/v1/traces") + .method(Method::POST) + .header(CONTENT_TYPE, "application/x-protobuf") + .header(first_header, example_headers.get(first_header).unwrap()) + .body(Full::new(buf)) + .unwrap(); + + let resp = svc.call(req).await.unwrap(); + assert_eq!(StatusCode::OK, resp.status()); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify request_context is NOT present when no headers specified + assert!(msg.request_context.is_none()); + } + + #[tokio::test] + async fn http_metadata_extracted_for_metrics() { + let (svc, _, mut metrics_rx, _) = + new_svc_with_metadata(true, vec!["my-custom-header".to_string()]); + + let metrics_req = FakeOTLP::metrics_service_request(); + let mut buf = Vec::with_capacity(metrics_req.encoded_len()); + assert_ok!(metrics_req.encode(&mut buf)); + let buf = Bytes::from(buf); + + let req: Request> = Request::builder() + .uri("/v1/metrics") + .method(Method::POST) + .header(CONTENT_TYPE, "application/x-protobuf") + .header("my-custom-header", "metrics-value") + .body(Full::new(buf)) + .unwrap(); + + let resp = svc.call(req).await.unwrap(); + assert_eq!(StatusCode::OK, resp.status()); + + let msg = metrics_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify request_context is present + assert!(msg.request_context.is_some()); + let request_context = msg.request_context.as_ref().unwrap(); + match request_context { + RequestContext::Http(h) => { + assert_eq!( + h.get("my-custom-header"), + Some(&"metrics-value".to_string()) + ); + } + RequestContext::Grpc(h) => { + panic!("expecting a Http header: got grpc headers {:?}", h); + } + } + } + + #[tokio::test] + async fn http_metadata_extracted_for_logs() { + let (svc, _, _, mut logs_rx) = + new_svc_with_metadata(true, vec!["my-custom-header".to_string()]); + + let logs_req = FakeOTLP::logs_service_request(); + let mut buf = Vec::with_capacity(logs_req.encoded_len()); + assert_ok!(logs_req.encode(&mut buf)); + let buf = Bytes::from(buf); + + let req: Request> = Request::builder() + .uri("/v1/logs") + .method(Method::POST) + .header(CONTENT_TYPE, "application/x-protobuf") + .header("my-custom-header", "logs-value") + .body(Full::new(buf)) + .unwrap(); + + let resp = svc.call(req).await.unwrap(); + assert_eq!(StatusCode::OK, resp.status()); + + let msg = logs_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify request_context is present + assert!(msg.request_context.is_some()); + let request_context = msg.request_context.as_ref().unwrap(); + match request_context { + RequestContext::Http(h) => { + assert_eq!(h.get("my-custom-header"), Some(&"logs-value".to_string())); + } + RequestContext::Grpc(h) => { + panic!("expecting a Http header: got grpc headers {:?}", h); + } + } + } + + #[tokio::test] + async fn http_metadata_header_case_insensitive() { + let (svc, mut trace_rx, _, _) = + new_svc_with_metadata(true, vec!["My-Custom-Header".to_string()]); + + let trace_req = FakeOTLP::trace_service_request(); + let mut buf = Vec::with_capacity(trace_req.encoded_len()); + assert_ok!(trace_req.encode(&mut buf)); + let buf = Bytes::from(buf); + + // Send header with different case + let req: Request> = Request::builder() + .uri("/v1/traces") + .method(Method::POST) + .header(CONTENT_TYPE, "application/x-protobuf") + .header("my-custom-header", "test-value") + .body(Full::new(buf)) + .unwrap(); + + let resp = svc.call(req).await.unwrap(); + assert_eq!(StatusCode::OK, resp.status()); + + let msg = trace_rx.next().await.unwrap(); + assert_eq!(1, msg.len()); + + // Verify request_context is present and header is normalized to lowercase + assert!(msg.request_context.is_some()); + let request_context = msg.request_context.as_ref().unwrap(); + match request_context { + RequestContext::Http(h) => { + // Should be able to retrieve with lowercase + assert_eq!(h.get("my-custom-header"), Some(&"test-value".to_string())); + } + RequestContext::Grpc(h) => { + panic!("expecting a Http header: got grpc headers {:?}", h); + } + } + } } diff --git a/src/telemetry/internal_exporter.rs b/src/telemetry/internal_exporter.rs index b88a04f4..c6c46f62 100644 --- a/src/telemetry/internal_exporter.rs +++ b/src/telemetry/internal_exporter.rs @@ -37,8 +37,9 @@ impl PushMetricExporter for InternalOTLPMetricsExporter { Some(mo) => { let req = ExportMetricsServiceRequest::from(&*metrics); let res = mo - .send(crate::topology::payload::Message { + .send(Message { metadata: None, + request_context: None, payload: req.resource_metrics, }) .await; diff --git a/src/topology/batch.rs b/src/topology/batch.rs index a26de483..68a966c9 100644 --- a/src/topology/batch.rs +++ b/src/topology/batch.rs @@ -170,6 +170,7 @@ where // Clone metadata for both parts - future reference counting will handle proper ack semantics Message { metadata: self.metadata.clone(), + request_context: self.request_context.clone(), payload: split_payload, } } @@ -296,6 +297,7 @@ mod tests { let first_request = FakeOTLP::trace_service_request_with_spans(1, 5); let message = Message { metadata: None, + request_context: None, payload: first_request.resource_spans, }; @@ -307,6 +309,7 @@ mod tests { let second_request = FakeOTLP::trace_service_request_with_spans(1, 7); let message2 = Message { metadata: None, + request_context: None, payload: second_request.resource_spans, }; @@ -335,6 +338,7 @@ mod tests { }; let message = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata.clone())), + request_context: None, payload: first_request.resource_spans, }; @@ -352,6 +356,7 @@ mod tests { }; let message2 = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata2)), + request_context: None, payload: second_request.resource_spans, }; @@ -397,6 +402,7 @@ mod tests { }; let message = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata.clone())), + request_context: None, payload: first_request.resource_spans, }; @@ -414,6 +420,7 @@ mod tests { }; let message2 = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata2.clone())), + request_context: None, payload: second_request.resource_spans, }; @@ -478,6 +485,7 @@ mod tests { }; let message = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata.clone())), + request_context: None, payload: request.resource_spans, }; @@ -522,6 +530,7 @@ mod tests { let request = FakeOTLP::trace_service_request_with_spans(1, 21); let message = Message { metadata: None, + request_context: None, payload: request.resource_spans, }; @@ -547,6 +556,7 @@ mod tests { }; let message = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata)), + request_context: None, payload: request.resource_spans, }; @@ -565,6 +575,7 @@ mod tests { let request = FakeOTLP::trace_service_request_with_spans(1, 5); let message = Message { metadata: None, + request_context: None, payload: request.resource_spans, }; let resp = batch.offer(vec![message]); @@ -580,6 +591,7 @@ mod tests { let request = FakeOTLP::trace_service_request_with_spans(1, 5); let message = Message { metadata: None, + request_context: None, payload: request.resource_spans, }; let resp = batch.offer(vec![message]); @@ -606,6 +618,7 @@ mod tests { }; let message = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata)), + request_context: None, payload: request.resource_spans, }; @@ -642,6 +655,7 @@ mod tests { let first_request = FakeOTLP::metrics_service_request_with_metrics(1, 5); let message1 = Message { metadata: None, + request_context: None, payload: first_request.resource_metrics, }; @@ -659,6 +673,7 @@ mod tests { }; let message2 = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata)), + request_context: None, payload: second_request.resource_metrics, }; @@ -703,6 +718,7 @@ mod tests { let first_request = FakeOTLP::logs_service_request_with_logs(1, 5); let message1 = Message { metadata: None, + request_context: None, payload: first_request.resource_logs, }; @@ -720,6 +736,7 @@ mod tests { }; let message2 = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata)), + request_context: None, payload: second_request.resource_logs, }; @@ -770,6 +787,7 @@ mod tests { }; let message1 = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata)), + request_context: None, payload: request1.resource_spans, }; @@ -777,6 +795,7 @@ mod tests { let request2 = FakeOTLP::trace_service_request_with_spans(1, 4); let message2 = Message { metadata: None, + request_context: None, payload: request2.resource_spans, }; @@ -790,6 +809,7 @@ mod tests { }; let message3 = Message { metadata: Some(MessageMetadata::kafka(kafka_metadata2)), + request_context: None, payload: request3.resource_spans, }; diff --git a/src/topology/fanout.rs b/src/topology/fanout.rs index 074210da..806c7771 100644 --- a/src/topology/fanout.rs +++ b/src/topology/fanout.rs @@ -348,6 +348,7 @@ mod tests { let test_payload = vec![ResourceSpans::default()]; let message = Message { metadata: Some(metadata), + request_context: None, payload: test_payload.clone(), }; diff --git a/src/topology/generic_pipeline.rs b/src/topology/generic_pipeline.rs index 30a042ba..1de3f73c 100644 --- a/src/topology/generic_pipeline.rs +++ b/src/topology/generic_pipeline.rs @@ -16,6 +16,8 @@ use opentelemetry_proto::tonic::trace::v1::ResourceSpans; #[cfg(feature = "pyo3")] use rotel_sdk::model::{PythonProcessable, register_processor}; #[cfg(feature = "pyo3")] +use rotel_sdk::py::request_context::RequestContext as PyRequestContext; +#[cfg(feature = "pyo3")] use std::env; use std::error::Error; #[cfg(feature = "pyo3")] @@ -213,10 +215,77 @@ where Ok(()) } + #[cfg(not(feature = "pyo3"))] + fn run_processors( + &self, + message: Message, + _: usize, + _: &[String], + inspector: &impl Inspect, + ) -> Message { + inspector.inspect(&message.payload); + message + } + + #[cfg(feature = "pyo3")] + fn run_processors( + &self, + message: Message, + len_processor_modules: usize, + processor_modules: &[String], + inspector: &impl Inspect, + ) -> Message { + let mut items = message.payload; + let request_context = message.request_context.clone(); + let mut py_request_context: Option = None; + match message.request_context { + None => {} + Some(ctx) => py_request_context = Some(ctx.into()), + } + // invoke current middleware layer + // todo: expand support for observability or transforms + if len_processor_modules > 0 { + inspector.inspect_with_prefix(Some("OTLP payload before processing".into()), &items); + } else { + inspector.inspect(&items); + } + // If any resource attributes were provided on start, set or append them to the resources + if !self.resource_attributes.is_empty() { + for item in &mut items { + item.set_or_append_attributes(self.resource_attributes.clone()) + } + } + for p in processor_modules { + let mut new_items = Vec::new(); + // Extract headers from request_context if available + + while !items.is_empty() { + let item = items.pop(); + if let Some(item) = item { + let result = item.process(p, py_request_context.clone()); + new_items.push(result); + } + } + items = new_items; + } + + if len_processor_modules > 0 { + inspector.inspect_with_prefix(Some("OTLP payload after processing".into()), &items); + } + + // Wrap the processed items back into a Message + Message { + metadata: message.metadata, + request_context, + payload: items, + } + } + #[cfg(feature = "pyo3")] fn initialize_processors(&mut self) -> Result, BoxError> { let mut processor_modules = vec![]; let current_dir = env::current_dir()?; + for (processor_idx, file) in self.processors.iter().enumerate() { let file_path = Path::new(file); @@ -226,10 +295,29 @@ where } else { current_dir.join(file_path) }; - let code = std::fs::read_to_string(&script_path)?; + + let code = match std::fs::read_to_string(&script_path) { + Ok(c) => c, + Err(e) => { + return Err(format!( + "Failed to read processor script {}: {}", + script_path.display(), + e + ) + .into()); + } + }; + let module = format!("rotel_processor_{}", processor_idx); - register_processor(code, file.clone(), module.clone())?; - processor_modules.push(module); + + match register_processor(code, file.clone(), module.clone()) { + Ok(_) => { + processor_modules.push(module); + } + Err(e) => { + return Err(format!("Failed to register processor {}: {}", file, e).into()); + } + } } Ok(processor_modules) } @@ -255,7 +343,13 @@ where batch_timer.tick().await; // consume the immediate tick #[cfg(feature = "pyo3")] - let processor_modules = self.initialize_processors()?; + let processor_modules = match self.initialize_processors() { + Ok(modules) => modules, + Err(e) => { + error!(error = ?e, "Failed to initialize processors"); + vec![] + } + }; #[cfg(not(feature = "pyo3"))] let processor_modules: Vec = vec![]; @@ -308,45 +402,10 @@ where return Ok(()); } - let message = item.unwrap(); - let mut items = message.payload; - - // invoke current middleware layer - // todo: expand support for observability or transforms - if len_processor_modules > 0 { - inspector.inspect_with_prefix(Some("OTLP payload before processing".into()), &items); - } else { - inspector.inspect(&items); - } - // If any resource attributes were provided on start, set or append them to the resources - if !self.resource_attributes.is_empty() { - for item in &mut items { - item.set_or_append_attributes(self.resource_attributes.clone()) - } - } - for p in &processor_modules { - let mut new_items = Vec::new(); - while !items.is_empty() { - let item = items.pop(); - if item.is_some() { - let result = item.unwrap().process(p); - new_items.push(result); - } - } - items = new_items; - } - - if len_processor_modules > 0 { - inspector.inspect_with_prefix(Some("OTLP payload after processing".into()), &items); - } - - // Wrap the processed items back into a Message - let processed_message = Message { - metadata: message.metadata, - payload: items, - }; + let mut message = item.unwrap(); + message = self.run_processors(message, len_processor_modules, &processor_modules, &inspector); - match batch.offer(vec![processed_message]) { + match batch.offer(vec![message]) { Ok(Some(popped)) => { let fut = self.fanout.send_async(popped); send_fut = Some(fut); diff --git a/src/topology/payload.rs b/src/topology/payload.rs index e00f25d2..91978822 100644 --- a/src/topology/payload.rs +++ b/src/topology/payload.rs @@ -7,18 +7,31 @@ use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::logs::v1::ResourceLogs; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; use opentelemetry_proto::tonic::trace::v1::ResourceSpans; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; +#[cfg(feature = "pyo3")] +use rotel_sdk::py::request_context::RequestContext as PyRequestContext; + #[derive(Clone, Debug, PartialEq)] pub struct Message { pub metadata: Option, + pub request_context: Option, pub payload: Vec, } impl Message { - pub fn new(metadata: Option, payload: Vec) -> Self { - Self { metadata, payload } + pub fn new( + metadata: Option, + payload: Vec, + request_context: Option, + ) -> Self { + Self { + metadata, + payload, + request_context, + } } // Used in testing @@ -33,12 +46,37 @@ pub struct MessageMetadata { ref_count: Arc, } +#[derive(Clone, Debug, PartialEq)] +pub enum RequestContext { + Http(HashMap), + Grpc(HashMap), +} + #[derive(Clone, Debug, PartialEq)] pub enum MessageMetadataInner { Kafka(KafkaMetadata), Forwarder(ForwarderMetadata), } +#[allow(clippy::from_over_into)] +#[cfg(feature = "pyo3")] +impl Into for RequestContext { + fn into(self) -> PyRequestContext { + match self { + RequestContext::Http(h) => { + PyRequestContext::HttpContext(rotel_sdk::py::request_context::HttpContext { + headers: h, + }) + } + RequestContext::Grpc(h) => { + PyRequestContext::GrpcContext(rotel_sdk::py::request_context::GrpcContext { + metadata: h, + }) + } + } + } +} + impl MessageMetadata { /// Create new MessageMetadata with Kafka variant, starting with ref_count = 1 pub fn kafka(metadata: KafkaMetadata) -> Self { @@ -120,6 +158,39 @@ pub struct KafkaMetadata { pub ack_chan: Option>, } +/// HTTP metadata containing request headers and other HTTP context +#[derive(Clone, Debug, PartialEq)] +pub struct HttpMetadata { + /// Map of header names (lowercase) to header values + pub headers: HashMap, +} + +impl HttpMetadata { + /// Create new HttpMetadata with headers + pub fn new(headers: HashMap) -> Self { + Self { headers } + } + + /// Get a header value by name (case-insensitive) + pub fn get_header(&self, name: &str) -> Option<&String> { + self.headers.get(&name.to_lowercase()) + } +} + +/// gRPC metadata containing request metadata and other gRPC context +#[derive(Clone, Debug, PartialEq)] +pub struct GrpcMetadata { + /// Map of metadata keys (lowercase) to metadata values + pub headers: HashMap, +} + +impl GrpcMetadata { + /// Create new GrpcMetadata with headers + pub fn new(headers: HashMap) -> Self { + Self { headers } + } +} + impl KafkaMetadata { /// Create new KafkaMetadata pub fn new( diff --git a/utilities/src/otlp.rs b/utilities/src/otlp.rs index 65440799..0d1d7c90 100644 --- a/utilities/src/otlp.rs +++ b/utilities/src/otlp.rs @@ -14,10 +14,37 @@ use opentelemetry_proto::tonic::resource::v1::Resource; use opentelemetry_proto::tonic::trace::v1; use opentelemetry_proto::tonic::trace::v1::span::SpanKind; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Status}; +use std::collections::HashMap; pub struct FakeOTLP; impl FakeOTLP { + /// Returns example HTTP/gRPC headers for testing header context functionality. + /// These headers match the ones used in tests and examples. + /// + /// # Returns + /// A `HashMap` with example headers: + /// - `my-custom-header`: `test-value-123` + /// - `another-header`: `another-test-value` + /// + /// # Example + /// ``` + /// use utilities::otlp::FakeOTLP; + /// use std::collections::HashMap; + /// + /// let headers = FakeOTLP::example_headers(); + /// assert_eq!(headers.get("my-custom-header"), Some(&"test-value-123".to_string())); + /// ``` + pub fn example_headers() -> HashMap { + let mut headers = HashMap::new(); + headers.insert("my-custom-header".to_string(), "test-value-123".to_string()); + headers.insert( + "another-header".to_string(), + "another-test-value".to_string(), + ); + headers + } + pub fn logs_service_request() -> ExportLogsServiceRequest { Self::logs_service_request_with_logs(1, 1) }