Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ core = []

# Telemetry module
telemetry = [
"opentelemetry-exporter-otlp>=1.28.0",
"opentelemetry-exporter-otlp>=1.28.0,<2.0.0",
"opentelemetry-instrumentation-anthropic>=0.41.0",
"opentelemetry-instrumentation-cohere>=0.41.0",
"opentelemetry-instrumentation-fastapi>=0.55b0",
Expand Down
39 changes: 19 additions & 20 deletions src/blaxel/telemetry/exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, Dict, Sequence

try:
import requests
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
OTLPLogExporter,
)
Expand All @@ -26,42 +27,40 @@
MetricsData = None


class _DynamicHeadersSession(requests.Session):
"""A requests.Session subclass that injects dynamic headers on every request."""

def __init__(self, get_headers: Callable[[], Dict[str, str]]):
super().__init__()
self._get_headers = get_headers

def post(self, *args, **kwargs):
self.headers.update(self._get_headers())
return super().post(*args, **kwargs)


class DynamicHeadersSpanExporter(OTLPSpanExporter): # type: ignore[misc]
"""Span exporter with dynamic headers."""

def __init__(self, get_headers: Callable[[], Dict[str, str]]):
self._get_headers = get_headers
super().__init__()

def export(self, spans):
self._session.headers.update(self._get_headers())
return super().export(spans)
session = _DynamicHeadersSession(get_headers)
super().__init__(session=session)


class DynamicHeadersMetricExporter(OTLPMetricExporter): # type: ignore[misc]
"""Metric exporter with dynamic headers."""

def __init__(self, get_headers: Callable[[], Dict[str, str]]):
self._get_headers = get_headers
super().__init__()

def export(
self,
metrics_data: MetricsData, # type: ignore[reportUnknownReturnType]
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult: # type: ignore[reportUnknownReturnType]
self._session.headers.update(self._get_headers())
return super().export(metrics_data, timeout_millis, **kwargs)
session = _DynamicHeadersSession(get_headers)
super().__init__(session=session)


class DynamicHeadersLogExporter(OTLPLogExporter): # type: ignore[misc]
"""Log exporter with dynamic headers."""

def __init__(self, get_headers: Callable[[], Dict[str, str]]):
self._get_headers = get_headers
super().__init__()

def export(self, batch: Sequence[LogData]): # type: ignore[reportUnknownReturnType]
self._session.headers.update(self._get_headers())
return super().export(batch)
session = _DynamicHeadersSession(get_headers)
super().__init__(session=session)
62 changes: 62 additions & 0 deletions tests/core/test_telemetry_exporters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from unittest.mock import patch, MagicMock

import pytest
import requests

from blaxel.telemetry.exporters import (
_DynamicHeadersSession,
_OPENTELEMETRY_AVAILABLE,
DynamicHeadersSpanExporter,
DynamicHeadersMetricExporter,
DynamicHeadersLogExporter,
)


class TestDynamicHeadersSession:
def test_injects_headers_on_post(self):
headers = {"Authorization": "Bearer token123", "X-Custom": "value"}
session = _DynamicHeadersSession(get_headers=lambda: headers)

with patch.object(requests.Session, "post", return_value=MagicMock()) as mock_post:
session.post("http://example.com", data=b"test")

assert session.headers["Authorization"] == "Bearer token123"
assert session.headers["X-Custom"] == "value"
mock_post.assert_called_once()

def test_refreshes_headers_each_call(self):
call_count = 0

def get_headers():
nonlocal call_count
call_count += 1
return {"X-Request-Id": str(call_count)}

session = _DynamicHeadersSession(get_headers=get_headers)

with patch.object(requests.Session, "post", return_value=MagicMock()):
session.post("http://example.com", data=b"first")
assert session.headers["X-Request-Id"] == "1"

session.post("http://example.com", data=b"second")
assert session.headers["X-Request-Id"] == "2"

assert call_count == 2


@pytest.mark.skipif(not _OPENTELEMETRY_AVAILABLE, reason="opentelemetry not fully installed")
class TestDynamicHeadersExporters:
def test_span_exporter_uses_dynamic_session(self):
headers_fn = lambda: {"Authorization": "Bearer span-token"}
exporter = DynamicHeadersSpanExporter(get_headers=headers_fn)
assert isinstance(exporter._session, _DynamicHeadersSession)

def test_metric_exporter_uses_dynamic_session(self):
headers_fn = lambda: {"Authorization": "Bearer metric-token"}
exporter = DynamicHeadersMetricExporter(get_headers=headers_fn)
assert isinstance(exporter._session, _DynamicHeadersSession)

def test_log_exporter_uses_dynamic_session(self):
headers_fn = lambda: {"Authorization": "Bearer log-token"}
exporter = DynamicHeadersLogExporter(get_headers=headers_fn)
assert isinstance(exporter._session, _DynamicHeadersSession)
Loading