Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pin-project = "1.1.10"
futures-util = "0.3.31"
rotel_python_processor_sdk = { path = "rotel_python_processor_sdk", optional = true }
pyo3 = { version = "0.24.1", optional = true }
chrono = { version = "0.4.40", features = [ "serde" ] }
chrono = { version = "0.4.40", features = ["serde"] }
serde = { version = "1.0.217", features = ["derive"] }
thiserror = "2.0.12"
lz4_flex = { version = "0.11.3", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -90,10 +90,11 @@ aws-config = { version = "1.8.6", optional = true }
aws-credential-types = { version = "1.2.6", optional = true }
# Use a custom branch that backdates to otel sdk 0.30.0 (we are not compatible with 0.31.0 yet). Also sort data points + attributes for visual stability
opentelemetry-prometheus-text-exporter = { git = "https://github.com/mheffner/opentelemetry-prometheus-text-exporter", branch = "sorted-otel-0.30.0", optional = true }
# Making utilities available to the generate-otlp binary
utilities = { path = "utilities" }

[dev-dependencies]
tokio-test = "0.4.4"
utilities = { path = "utilities" }
httpmock = "0.7.0"
criterion = { version = "0.5.1", features = ["async_tokio"] }
tracing-test = "0.2.5"
Expand Down Expand Up @@ -141,5 +142,9 @@ harness = false
[[bin]]
name = "rotel"

[[bin]]
name = "generate-otlp"
path = "src/bin/generate-otlp/main.rs"

[profile.release]
lto = "fat"
84 changes: 84 additions & 0 deletions Dockerfile.context-processor
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Python stage - get Python 3.13 with dev headers from Ubuntu
FROM ubuntu:22.04 AS python-builder
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
software-properties-common \
&& add-apt-repository -y ppa:deadsnakes/ppa \
&& apt-get update \
&& apt-get install -y \
python3.13 \
python3.13-dev \
&& rm -rf /var/lib/apt/lists/*

# Build stage
FROM rust:1.91 AS builder

# Copy Python 3.13 from python-builder stage
COPY --from=python-builder /usr/bin/python3.13 /usr/bin/python3.13
COPY --from=python-builder /usr/lib/python3.13 /usr/lib/python3.13
COPY --from=python-builder /usr/include/python3.13 /usr/include/python3.13
# Copy Python shared libraries (needed for linking)
RUN mkdir -p /usr/lib/x86_64-linux-gnu
COPY --from=python-builder /usr/lib/x86_64-linux-gnu/libpython3.13* /usr/lib/x86_64-linux-gnu/

# Install build dependencies (matching DEVELOPING.md requirements)
RUN apt-get update && apt-get install -y \
protobuf-compiler \
cmake \
clang \
libclang-dev \
libzstd-dev \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*

# Set environment variables for PyO3 to find Python
ENV PYTHON_SYS_EXECUTABLE=/usr/bin/python3.13

WORKDIR /app

# Copy manifest files
COPY Cargo.toml Cargo.lock ./
COPY rotel_python_processor_sdk/Cargo.toml ./rotel_python_processor_sdk/

# Copy source code
COPY . .

# Build with pyo3 feature for Python processor support
# Disable default features (rdkafka, aws_iam) to speed up compilation
RUN cargo build --release --no-default-features --features pyo3 --target x86_64-unknown-linux-gnu

# Runtime stage
# Use Debian to match the builder's GLIBC version (rust:1.91 is Debian-based)
FROM debian:trixie-slim

# Set environment variables for non-interactive apt operations
ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y ca-certificates && apt-get clean

# Set environment variables to prevent Python from writing .pyc files and to unbuffer stdout/stderr
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

# Install Python 3.13 from Debian repos (trixie has Python 3.13)
RUN apt-get update && \
apt-get install -y python3.13 python3.13-venv python3.13-dev && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Set Python 3.13 as the default python3
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.13 1

# Copy the built binary from builder stage
COPY --from=builder /app/target/x86_64-unknown-linux-gnu/release/rotel /rotel
RUN chmod 0755 /rotel

# Copy the context processor
RUN mkdir -p /processors
COPY rotel_python_processor_sdk/processors/context_processor.py /processors/context_processor.py

EXPOSE 5418

ENTRYPOINT ["/rotel", "start", "--otlp-http-endpoint", "0.0.0.0:5418", "--otlp-http-include-metadata", "--otlp-http-headers-to-include", "my-custom-header", "--otlp-with-trace-processor", "/processors/context_processor.py", "--exporter", "otlp", "--otlp-exporter-endpoint", "signoz-otel-collector:4317"]
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ follow these steps:
telemetrygen traces --otlp-insecure --duration 5s
```

Alternatively, use the built-in `generate-otlp` tool:

```bash
# Generate and send traces directly to Rotel
cargo run --bin generate-otlp -- traces --http-endpoint localhost:4318

# Or generate a trace file for testing
cargo run --bin generate-otlp -- traces --file trace.pb

# Then send it with curl
curl -X POST http://localhost:4318/v1/traces \
-H "Content-Type: application/x-protobuf" \
--data-binary @trace.pb
```

- Check the output from Rotel and you should see several "Received traces" log lines.

## Configuration
Expand Down
141 changes: 141 additions & 0 deletions rotel_python_processor_sdk/processors/context_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
ContextProcessor demonstrates how to pull HTTP/gRPC headers from message metadata
(context) and add them as span attributes. This follows the OTel Collector
pattern where headers/metadata are stored in context by the receiver and processors pull
from context to add attributes.

This processor extracts custom headers from context and adds them as span
attributes following OpenTelemetry semantic conventions (http.request.header.*).

Usage:
For HTTP, configure the receiver to include metadata:
ROTEL_OTLP_HTTP_INCLUDE_METADATA=true
ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header

For gRPC, configure the receiver to include metadata:
ROTEL_OTLP_GRPC_INCLUDE_METADATA=true
ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header

Then use this processor to add those headers as span attributes.

Message metadata is now exposed to Python processors. Processors can
access headers via:
resource_spans.message_metadata # Returns dict[str, str] or None

This processor works with both HTTP headers and gRPC metadata - they are
both exposed as the same dictionary format to Python processors.

This processor demonstrates how to extract headers from context and add
them as span attributes following OpenTelemetry semantic conventions.
"""

from typing import Optional

from rotel_sdk.open_telemetry.common.v1 import AnyValue, KeyValue
from rotel_sdk.open_telemetry.logs.v1 import ResourceLogs
from rotel_sdk.open_telemetry.metrics.v1 import ResourceMetrics
from rotel_sdk.open_telemetry.request import RequestContext
from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans


def _get_header_from_context(
request_context: Optional[RequestContext], header_name: str
) -> Optional[str]:
"""
Get a header value from message metadata (context).

Accesses headers via:
resource_spans.request_context.get(header_name)

The pattern:
- Headers/metadata keys are stored with lowercase keys
- Works for both HTTP headers and gRPC metadata
"""
if request_context is not None:
if isinstance(request_context, RequestContext.HttpContext):
return request_context.http_context.headers.get(header_name.lower())
elif isinstance(request_context, RequestContext.GrpcContext):
return request_context.grpc_context.metadata.get(header_name.lower())
return None


class ContextProcessor:
def process_spans(self, resource_spans: ResourceSpans):
"""
Process ResourceSpans by extracting a custom header from context
and adding it as a span attribute.

This function extracts "my-custom-header" from context and adds it as
a span attribute following OTel semantic convention: http.request.header.*

Example: If the receiver is configured with:
ROTEL_OTLP_HTTP_INCLUDE_METADATA=true
ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header

Or for gRPC:
ROTEL_OTLP_GRPC_INCLUDE_METADATA=true
ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header

And the request includes "my-custom-header: test-value-123", then
this processor will add the attribute
"http.request.header.my-custom-header" = "test-value-123" to all spans.
"""
# Header to extract from context
header_name = "my-custom-header"

# Get header value from context
header_value = _get_header_from_context(
resource_spans.request_context, header_name
)

if header_value:
# Create attribute following OTel semantic convention
attr = KeyValue(
key=f"http.request.header.{header_name}",
value=AnyValue(header_value),
)

# Add attribute to all spans
for scope_spans in resource_spans.scope_spans:
for span in scope_spans.spans:
span.attributes.append(attr)

# Example: You can also add to resource attributes instead:
# if header_value and resource_spans.resource:
# resource_spans.resource.attributes.append(attr)

def process_metrics(self, resource_metrics: ResourceMetrics):
"""
Process metrics - add custom header to resource attributes.
Metrics typically use resource attributes rather than per-metric
attributes.
"""
header_name = "my-custom-header"
header_value = _get_header_from_context(
resource_metrics.request_context, header_name
)

if header_value and resource_metrics.resource:
attr = KeyValue(
key=f"http.request.header.{header_name}",
value=AnyValue(header_value),
)
resource_metrics.resource.attributes.append(attr)

def process_logs(self, resource_logs: ResourceLogs):
"""
Process logs - add custom header to log record attributes.
"""
header_name = "my-custom-header"
header_value = _get_header_from_context(
resource_logs.request_context, header_name
)
if header_value:
attr = KeyValue(
key=f"http.request.header.{header_name}",
value=AnyValue(header_value),
)

for scope_logs in resource_logs.scope_logs:
for log_record in scope_logs.log_records:
log_record.attributes.append(attr)
23 changes: 23 additions & 0 deletions rotel_python_processor_sdk/python_tests/context_processor_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys

from rotel_sdk.open_telemetry.logs.v1 import ResourceLogs
from rotel_sdk.open_telemetry.metrics.v1 import ResourceMetrics
from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans

sys.path.insert(0, './processors')

from context_processor import ContextProcessor

processor = ContextProcessor()


def process_logs(resource_logs: ResourceLogs):
processor.process_logs(resource_logs)


def process_spans(resource_spans: ResourceSpans):
processor.process_spans(resource_spans)


def process_metrics(resource_metrics: ResourceMetrics):
processor.process_metrics(resource_metrics)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional

from rotel_sdk.open_telemetry.common.v1 import InstrumentationScope, KeyValue, AnyValue
from rotel_sdk.open_telemetry.request import RequestContext
from rotel_sdk.open_telemetry.resource.v1 import Resource
from typing import Optional


class ResourceLogs:
Expand All @@ -24,6 +26,7 @@ class ResourceLogs:
This schema_url applies to the data in the "resource" field.
It does not apply to the data in the "scope_logs" field which have their own schema_url field.
"""
request_context: Optional[RequestContext]


class ScopeLogs:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from rotel_sdk.open_telemetry.common.v1 import InstrumentationScope, KeyValue
from rotel_sdk.open_telemetry.request import RequestContext
from rotel_sdk.open_telemetry.resource.v1 import Resource


Expand All @@ -26,6 +27,7 @@ class ResourceMetrics:
This schema_url applies to the data in the "resource" field. It does not apply
to the data in the "scope_metrics" field which have their own schema_url field.
"""
request_context: Optional[RequestContext]


class ScopeMetrics:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class RequestContext:
"""
Union type representing different metric data types.
"""
HttpContext = HttpContext
GrpcContext = GrpcContext

@property
def http_context(self) -> HttpContext: ...

@property
def grpc_context(self) -> GrpcContext: ...


class HttpContext:
headers: dict[str, str]


class GrpcContext:
metadata: dict[str, str]
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional

from rotel_sdk.open_telemetry.common.v1 import InstrumentationScope, KeyValue
from rotel_sdk.open_telemetry.request import RequestContext
from rotel_sdk.open_telemetry.resource.v1 import Resource
from typing import Optional


class ResourceSpans:
Expand All @@ -25,6 +27,7 @@ class ResourceSpans:
This schema_url applies to the data in the "resource" field. It does not apply
to the data in the "scope_spans" field which have their own schema_url field.
"""
request_context: Optional[RequestContext]


class ScopeSpans:
Expand Down
Loading