From 59cbc9402156417f9f418dab78751fa00365a5db Mon Sep 17 00:00:00 2001 From: "mendral-app[bot]" <233154221+mendral-app[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 12:55:41 -0700 Subject: [PATCH] fix: use public session API for dynamic telemetry headers Replace direct access to private _session.headers attribute with a custom requests.Session subclass passed via the public 'session' constructor parameter. This prevents AttributeError crashes when the OTel exporter-otlp-proto-http package refactors its internals. - Add _DynamicHeadersSession that injects headers on every post() - Pass custom session to all three exporter constructors - Cap opentelemetry-exporter-otlp dependency at <2.0.0 - Add unit tests for dynamic header injection --- pyproject.toml | 2 +- src/blaxel/telemetry/exporters.py | 39 ++++++++-------- tests/core/test_telemetry_exporters.py | 62 ++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 21 deletions(-) create mode 100644 tests/core/test_telemetry_exporters.py diff --git a/pyproject.toml b/pyproject.toml index 9d88e2a4..920be70e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ core = [] # Telemetry module telemetry = [ - "opentelemetry-exporter-otlp>=1.28.0", + "opentelemetry-exporter-otlp>=1.28.0,<2.0.0", "opentelemetry-instrumentation-anthropic>=0.41.0", "opentelemetry-instrumentation-cohere>=0.41.0", "opentelemetry-instrumentation-fastapi>=0.55b0", diff --git a/src/blaxel/telemetry/exporters.py b/src/blaxel/telemetry/exporters.py index 03fe4bfa..28003598 100644 --- a/src/blaxel/telemetry/exporters.py +++ b/src/blaxel/telemetry/exporters.py @@ -3,6 +3,7 @@ from typing import Callable, Dict, Sequence try: + import requests from opentelemetry.exporter.otlp.proto.http._log_exporter import ( OTLPLogExporter, ) @@ -26,16 +27,25 @@ MetricsData = None +class _DynamicHeadersSession(requests.Session): + """A requests.Session subclass that injects dynamic headers on every request.""" + + def __init__(self, get_headers: Callable[[], Dict[str, str]]): + super().__init__() + self._get_headers = get_headers + + def post(self, *args, **kwargs): + self.headers.update(self._get_headers()) + return super().post(*args, **kwargs) + + class DynamicHeadersSpanExporter(OTLPSpanExporter): # type: ignore[misc] """Span exporter with dynamic headers.""" def __init__(self, get_headers: Callable[[], Dict[str, str]]): self._get_headers = get_headers - super().__init__() - - def export(self, spans): - self._session.headers.update(self._get_headers()) - return super().export(spans) + session = _DynamicHeadersSession(get_headers) + super().__init__(session=session) class DynamicHeadersMetricExporter(OTLPMetricExporter): # type: ignore[misc] @@ -43,16 +53,8 @@ class DynamicHeadersMetricExporter(OTLPMetricExporter): # type: ignore[misc] def __init__(self, get_headers: Callable[[], Dict[str, str]]): self._get_headers = get_headers - super().__init__() - - def export( - self, - metrics_data: MetricsData, # type: ignore[reportUnknownReturnType] - timeout_millis: float = 10_000, - **kwargs, - ) -> MetricExportResult: # type: ignore[reportUnknownReturnType] - self._session.headers.update(self._get_headers()) - return super().export(metrics_data, timeout_millis, **kwargs) + session = _DynamicHeadersSession(get_headers) + super().__init__(session=session) class DynamicHeadersLogExporter(OTLPLogExporter): # type: ignore[misc] @@ -60,8 +62,5 @@ class DynamicHeadersLogExporter(OTLPLogExporter): # type: ignore[misc] def __init__(self, get_headers: Callable[[], Dict[str, str]]): self._get_headers = get_headers - super().__init__() - - def export(self, batch: Sequence[LogData]): # type: ignore[reportUnknownReturnType] - self._session.headers.update(self._get_headers()) - return super().export(batch) + session = _DynamicHeadersSession(get_headers) + super().__init__(session=session) diff --git a/tests/core/test_telemetry_exporters.py b/tests/core/test_telemetry_exporters.py new file mode 100644 index 00000000..e829c2bc --- /dev/null +++ b/tests/core/test_telemetry_exporters.py @@ -0,0 +1,62 @@ +from unittest.mock import patch, MagicMock + +import pytest +import requests + +from blaxel.telemetry.exporters import ( + _DynamicHeadersSession, + _OPENTELEMETRY_AVAILABLE, + DynamicHeadersSpanExporter, + DynamicHeadersMetricExporter, + DynamicHeadersLogExporter, +) + + +class TestDynamicHeadersSession: + def test_injects_headers_on_post(self): + headers = {"Authorization": "Bearer token123", "X-Custom": "value"} + session = _DynamicHeadersSession(get_headers=lambda: headers) + + with patch.object(requests.Session, "post", return_value=MagicMock()) as mock_post: + session.post("http://example.com", data=b"test") + + assert session.headers["Authorization"] == "Bearer token123" + assert session.headers["X-Custom"] == "value" + mock_post.assert_called_once() + + def test_refreshes_headers_each_call(self): + call_count = 0 + + def get_headers(): + nonlocal call_count + call_count += 1 + return {"X-Request-Id": str(call_count)} + + session = _DynamicHeadersSession(get_headers=get_headers) + + with patch.object(requests.Session, "post", return_value=MagicMock()): + session.post("http://example.com", data=b"first") + assert session.headers["X-Request-Id"] == "1" + + session.post("http://example.com", data=b"second") + assert session.headers["X-Request-Id"] == "2" + + assert call_count == 2 + + +@pytest.mark.skipif(not _OPENTELEMETRY_AVAILABLE, reason="opentelemetry not fully installed") +class TestDynamicHeadersExporters: + def test_span_exporter_uses_dynamic_session(self): + headers_fn = lambda: {"Authorization": "Bearer span-token"} + exporter = DynamicHeadersSpanExporter(get_headers=headers_fn) + assert isinstance(exporter._session, _DynamicHeadersSession) + + def test_metric_exporter_uses_dynamic_session(self): + headers_fn = lambda: {"Authorization": "Bearer metric-token"} + exporter = DynamicHeadersMetricExporter(get_headers=headers_fn) + assert isinstance(exporter._session, _DynamicHeadersSession) + + def test_log_exporter_uses_dynamic_session(self): + headers_fn = lambda: {"Authorization": "Bearer log-token"} + exporter = DynamicHeadersLogExporter(get_headers=headers_fn) + assert isinstance(exporter._session, _DynamicHeadersSession)