From 7a382038de03f11ba1ef129fe2a8d26cc715bf32 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 10 Jun 2026 19:45:28 +0300 Subject: [PATCH 1/6] fix: correct per-operation tracing and tidy HTTP transport edge cases Pipeline tracing: - The per-operation HttpTracer lifecycle (operation_started / operation_succeeded / operation_failed) was emitted from TracingPolicy, which runs inside the retry and redirect wrappers and is re-entered once per attempt/hop. Gated on the first attempt, a call that failed once and then succeeded on a retry reported operation_failed and never operation_succeeded, and a redirect whose later hop failed reported operation_succeeded. - Add OperationTracingPolicy at a new outermost Stage.OPERATION that brackets the whole call from outside the wrappers, emitting operation_started before the chain and exactly one operation_succeeded / operation_failed on the final outcome. TracingPolicy keeps the per-attempt span and the per-request events (request_sent / response_*); default_pipeline wires both. Transports and utilities: - urllib: stop dropping a valid Content-Length under Content-Encoding. http.client does not decode content encodings, so the served body is the wire payload whose length is the upstream Content-Length; the header is accurate and is now surfaced (the decompressing requests/httpx/aiohttp adapters still drop it). - urllib and asyncio: report "Invalid status code" for an out-of-range status, matching the other adapters. - asyncio: match the chunked transfer-coding by token rather than substring so a coding such as "x-chunked" is not mistaken for chunked framing. - Share one cross-origin helper (http.common.url._origin) between the redirect and auth policies instead of duplicating it. Docs and the committed API surface baseline are updated accordingly. Co-Authored-By: Claude Opus 4.8 --- docs/pipelines.md | 50 ++++--- .../dexpace/sdk/core/http/auth/policies.py | 21 +-- .../src/dexpace/sdk/core/http/common/url.py | 24 ++++ .../src/dexpace/sdk/core/pipeline/defaults.py | 19 ++- .../sdk/core/pipeline/policies/__init__.py | 3 +- .../sdk/core/pipeline/policies/redirect.py | 22 +-- .../core/pipeline/policies/tracing_policy.py | 129 +++++++++++------- .../src/dexpace/sdk/core/pipeline/stage.py | 19 ++- .../tests/pipeline/test_defaults.py | 6 +- .../tests/pipeline/test_retry_tuning.py | 19 ++- .../tests/pipeline/test_stage.py | 18 ++- .../tests/pipeline/test_tracer_emission.py | 119 ++++++++++++++-- .../sdk/http/stdlib/asyncio_http_client.py | 15 +- .../sdk/http/stdlib/urllib_http_client.py | 16 ++- .../tests/test_asyncio_http_client.py | 17 +++ .../tests/test_urllib_http_client.py | 12 +- tools/surface_baseline.json | 9 ++ 17 files changed, 369 insertions(+), 149 deletions(-) diff --git a/docs/pipelines.md b/docs/pipelines.md index f0bb645..018345c 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -37,23 +37,35 @@ two parallel variants: ```python from dexpace.sdk.http.stdlib import UrllibHttpClient from dexpace.sdk.core.pipeline import Pipeline -from dexpace.sdk.core.pipeline.policies import LoggingPolicy, RetryPolicy, TracingPolicy +from dexpace.sdk.core.pipeline.policies import ( + LoggingPolicy, + OperationTracingPolicy, + RetryPolicy, + TracingPolicy, +) with Pipeline( UrllibHttpClient(), - policies=[TracingPolicy(), LoggingPolicy(), RetryPolicy()], + policies=[OperationTracingPolicy(), TracingPolicy(), LoggingPolicy(), RetryPolicy()], ) as pipeline: response = pipeline.run(request, dispatch_ctx) ``` The chain runs in declaration order. For the example above: -1. `TracingPolicy` opens a span. -2. `LoggingPolicy` logs the request. -3. `RetryPolicy` invokes the transport; retries on transient failures. -4. The transport runner calls `UrllibHttpClient.execute(request)`. -5. The unwinding mirror order: logging emits the response, tracing - closes the span. +1. `OperationTracingPolicy` emits `operation_started` for the whole call. +2. `TracingPolicy` opens a span. +3. `LoggingPolicy` logs the request. +4. `RetryPolicy` invokes the transport; retries on transient failures. +5. The transport runner calls `UrllibHttpClient.execute(request)`. +6. The unwinding mirrors that order: logging emits the response, tracing + closes the span, and operation-tracing emits the single + `operation_succeeded` / `operation_failed` once the call settles. + +In the canonical `default_pipeline`, `OperationTracingPolicy` sits *outside* +the redirect and retry wrappers (so the per-operation lifecycle fires once and +reflects the final outcome), while `TracingPolicy` sits *inside* them and opens +one span per attempt / hop. ## Built-in policies @@ -61,7 +73,8 @@ The chain runs in declaration order. For the example above: |-------------------------------------|----------------------------------------------------------| | `RetryPolicy` / `AsyncRetryPolicy` | Retry transient failures with backoff + `Retry-After`. Auto-buffers single-use request bodies when `total_retries > 0`. | | `LoggingPolicy` | Structured request/response logs with URL redaction. | -| `TracingPolicy` | Open a span per request; OTel semantic-conv attributes. | +| `OperationTracingPolicy` | Emit the per-operation lifecycle (`operation_started`, then one `operation_succeeded`/`operation_failed`) from outside the retry/redirect loop. | +| `TracingPolicy` | Open a span per attempt and emit per-request tracer events; OTel semantic-conv attributes. | | `BearerTokenPolicy` (auth) | Acquire + cache + apply OAuth bearer tokens. | | `KeyCredentialPolicy` (auth) | Stamp an API key into a configurable header. | | `BasicAuthPolicy` (auth) | `Authorization: Basic `. | @@ -88,16 +101,19 @@ Per-call opt-outs follow a convention: second `iter_bytes()` call raises `RuntimeError`. To keep retries safe without forcing every caller to remember `to_replayable()`, both `RetryPolicy.send` and `AsyncRetryPolicy.send` inspect the body up front -and, when `total_retries > 0`, swap in a buffered replayable copy before -the first attempt: +and, when the *effective* retry total for the call is positive, swap in a +buffered replayable copy before the first attempt: ```python -if total_retries > 0 and request.body is not None and not request.body.is_replayable(): +settings = self._configure_settings(ctx.options) +if settings["total"] > 0 and request.body is not None and not request.body.is_replayable(): request = request.with_body(request.body.to_replayable()) ``` -`total_retries == 0` (e.g. `RetryPolicy.no_retries()`) skips the buffering -step so callers who explicitly opt out of retries pay no memory cost for a -copy they will never use. Already-replayable bodies (`from_bytes`, -`from_string`, `from_form`) flow through untouched because -`to_replayable()` returns `self`. +The decision reads the effective total *after* merging any per-call +`retry_total` override (from `ctx.options`) with the constructor default, so a +per-call `retry_total=3` over an instance built with `total_retries=0` still +buffers, and a per-call `retry_total=0` (or `RetryPolicy.no_retries()`) skips +the buffering so callers who opt out of retries pay no memory cost for a copy +they will never use. Already-replayable bodies (`from_bytes`, `from_string`, +`from_form`) flow through untouched because `to_replayable()` returns `self`. diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/auth/policies.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/auth/policies.py index e2543e9..b96876b 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/auth/policies.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/auth/policies.py @@ -14,6 +14,7 @@ from ...pipeline.policy import Policy from ...pipeline.stage import Stage from ...util.clock import ASYNC_SYSTEM_CLOCK, SYSTEM_CLOCK, AsyncClock, Clock +from ..common.url import _origin from .access_token import AccessTokenInfo, TokenRequestOptions from .challenge import parse_challenges from .challenge_handler import ChallengeHandler @@ -439,29 +440,9 @@ async def _authorize( return request.with_header("Authorization", f"{token.token_type} {token.token}") -_DEFAULT_PORTS: dict[str, int] = {"https": 443, "http": 80} _AUTH_ORIGIN_KEY: str = "_auth_origin" -def _origin(url: Url) -> tuple[str, str, int | None]: - """Return the ``(scheme, host, port)`` origin tuple for ``url``. - - The scheme and host are lower-cased and the port is resolved to its - scheme default (443 for https, 80 for http) when not explicit, so two - URLs that differ only in an implied/explicit default port compare equal. - - Args: - url: The URL to derive an origin from. - - Returns: - A ``(scheme, host, effective_port)`` tuple suitable for equality - comparison. - """ - scheme = url.scheme.lower() - port = url.port if url.port is not None else _DEFAULT_PORTS.get(scheme) - return scheme, url.host.lower(), port - - def _crosses_recorded_origin(request: Request, ctx: PipelineContext) -> bool: """Report whether ``request`` left the origin recorded for this operation. diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/common/url.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/common/url.py index 522a1ea..04303af 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/common/url.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/common/url.py @@ -320,4 +320,28 @@ def parse(cls, raw: str) -> Self: ) +_DEFAULT_PORTS: dict[str, int] = {"https": 443, "http": 80} + + +def _origin(url: Url) -> tuple[str, str, int | None]: + """Return the ``(scheme, host, effective_port)`` origin tuple for ``url``. + + The scheme and host are lower-cased and the port is resolved to its scheme + default (443 for https, 80 for http) when not explicit, so two URLs that + differ only in an implied/explicit default port compare equal. Shared by + the redirect and auth policies for their cross-origin checks so the origin + definition stays in one place. + + Args: + url: The URL to derive an origin from. + + Returns: + A ``(scheme, host, effective_port)`` tuple suitable for equality + comparison. + """ + scheme = url.scheme.lower() + port = url.port if url.port is not None else _DEFAULT_PORTS.get(scheme) + return scheme, url.host.lower(), port + + __all__ = ["QueryParams", "Url"] diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py index 4073a5b..f4be724 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py @@ -19,7 +19,7 @@ from .policies.redirect import RedirectPolicy from .policies.retry import RetryPolicy from .policies.set_date import SetDatePolicy -from .policies.tracing_policy import TracingPolicy +from .policies.tracing_policy import OperationTracingPolicy, TracingPolicy from .staged_builder import StagedPipelineBuilder if TYPE_CHECKING: @@ -44,15 +44,21 @@ def default_pipeline( """Pre-configured `StagedPipelineBuilder` with the canonical stack. Wires the policies that most consumers want by default in the order their - stages dictate: redirect → idempotency → retry → set-date → - client-identity → auth → logging → tracing. Each policy is opt-out (pass - ``None``) or opt-in-with-override (pass a pre-configured instance to - replace the default). + stages dictate: operation-tracing → redirect → idempotency → retry → + set-date → client-identity → auth → logging → tracing. Each policy is + opt-out (pass ``None``) or opt-in-with-override (pass a pre-configured + instance to replace the default). Idempotency sits before retry so a write request's ``Idempotency-Key`` is minted once and reused across every retry; ``set-date`` and ``client-identity`` sit just inside the retry wrapper. + Two tracing policies cooperate: `OperationTracingPolicy` brackets the whole + operation from outside the redirect / retry wrappers (so the per-operation + lifecycle fires once and reflects the final outcome), while `TracingPolicy` + opens a span and emits per-request events inside the wrappers, once per hop + / attempt. + Args: client: Terminal HTTP transport. redirect: Override for `RedirectPolicy`. ``None`` uses defaults. @@ -73,6 +79,9 @@ def default_pipeline( immediate ``.build()``. """ builder = StagedPipelineBuilder(client) + # Sorts to Stage.OPERATION (outermost), bracketing every hop / attempt so + # the per-operation lifecycle fires once on the final outcome. + builder.append(OperationTracingPolicy()) builder.append(redirect or RedirectPolicy()) builder.append(idempotency or IdempotencyPolicy()) builder.append(retry or RetryPolicy()) diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py index 23ac7c1..33a5375 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py @@ -17,7 +17,7 @@ from .redirect import RedirectPolicy from .retry import RetryMode, RetryPolicy from .set_date import SetDatePolicy -from .tracing_policy import TracingPolicy +from .tracing_policy import OperationTracingPolicy, TracingPolicy __all__ = [ "AsyncClientIdentityPolicy", @@ -28,6 +28,7 @@ "ClientIdentityPolicy", "IdempotencyPolicy", "LoggingPolicy", + "OperationTracingPolicy", "RedirectPolicy", "RequestHistory", "RetryMode", diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/redirect.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/redirect.py index fcef930..cb62a2e 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/redirect.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/redirect.py @@ -29,7 +29,7 @@ from typing import TYPE_CHECKING, ClassVar, Literal, cast from urllib.parse import urljoin -from ...http.common.url import Url +from ...http.common.url import Url, _origin from ...http.request.method import Method from ..policy import Policy from ..stage import Stage @@ -43,26 +43,6 @@ _REDIRECT_STATUSES: frozenset[int] = frozenset({301, 302, 303, 307, 308}) _CONTENT_HEADER_PREFIX: str = "content-" -_DEFAULT_PORTS: dict[str, int] = {"https": 443, "http": 80} - - -def _origin(url: Url) -> tuple[str, str, int | None]: - """Return the ``(scheme, host, port)`` origin tuple for ``url``. - - The scheme and host are lower-cased and the port is resolved to its - scheme default (443 for https, 80 for http) when not explicit, so two - URLs that differ only in an implied/explicit default port compare equal. - - Args: - url: The URL to derive an origin from. - - Returns: - A ``(scheme, host, effective_port)`` tuple suitable for equality - comparison. - """ - scheme = url.scheme.lower() - port = url.port if url.port is not None else _DEFAULT_PORTS.get(scheme) - return scheme, url.host.lower(), port #: ``ctx.data`` key holding the per-operation ``HttpTracer``. The first policy diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py index 3af7d33..88795f0 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py @@ -1,23 +1,34 @@ # Copyright (c) 2026 dexpace and Omar Aljarrah. # Licensed under the MIT License. See LICENSE.md in the repository root for details. -"""Pipeline policy that opens a span around the downstream chain. - -Besides the OpenTelemetry span, the policy drives two correlation seams: - -- It mints a per-operation ``HttpTracer`` from - ``ctx.call.instrumentation_context.http_tracer_factory`` and emits the - fine-grained operation/request/response lifecycle events an SRE wants - (``operation_started``, ``request_sent``, ``response_headers_received``, - ``response_received``, ``operation_succeeded`` / ``operation_failed``). Per - attempt events (``attempt_started`` / ``attempt_failed`` / - ``attempt_retries_exhausted``) are owned by the retry policy. -- It binds the active trace/span ids into ``contextvars`` for the duration of - the request so ``ClientLogger`` can stamp ``trace.id`` / ``span.id`` onto - every log record emitted downstream. - -Both seams are no-op-safe: the default tracer factory returns -``NOOP_HTTP_TRACER`` and the no-op span carries the sentinel trace ids. +"""Pipeline policies that drive the tracing and correlation seams. + +Tracing is split across two policies that observe the request at two +different scopes: + +- `OperationTracingPolicy` sits at `Stage.OPERATION`, *outside* the redirect + and retry wrappers, so a single entry brackets the whole operation no + matter how many hops or attempts happen inside. It emits the per-operation + ``HttpTracer`` lifecycle events that must fire exactly once and reflect the + final outcome: ``operation_started`` before the chain runs, then exactly one + of ``operation_succeeded`` / ``operation_failed`` once it unwinds. A call + that fails on its first attempt and succeeds on a retry therefore reports a + single ``operation_succeeded`` (and a call that exhausts its retries reports + a single ``operation_failed`` carrying the error that actually escaped). +- `TracingPolicy` sits at `Stage.POST_LOGGING`, *inside* the wrappers, so it + is re-entered once per attempt / hop. It opens an OpenTelemetry span per + attempt, emits the per-request ``HttpTracer`` events (``request_sent``, + ``response_headers_received``, ``response_received``), and binds the active + trace / span ids into ``contextvars`` so ``ClientLogger`` can stamp them onto + every log record emitted while the span is current. Per-attempt retry events + (``attempt_started`` / ``attempt_failed`` / ``attempt_retries_exhausted``) + are owned by the retry policy. + +Both policies resolve the same per-operation ``HttpTracer`` via +``resolve_http_tracer`` (cached in ``ctx.data``), and both are no-op-safe: the +default tracer factory returns ``NOOP_HTTP_TRACER`` and the no-op span carries +the sentinel trace ids. Disable either per-call by setting +``ctx.options["tracing_enabled"] = False``. """ from __future__ import annotations @@ -36,19 +47,46 @@ from ...instrumentation import HttpTracer, Span from ..context import PipelineContext -#: ``ctx.data`` flag marking that ``operation_started`` has already fired for -#: this operation. Because ``TracingPolicy`` sits inside RETRY / REDIRECT it is -#: re-entered once per attempt / hop; the flag de-duplicates the operation-level -#: lifecycle events so ``operation_started`` fires once on the outermost entry -#: and ``operation_succeeded`` / ``operation_failed`` fire once on the -#: outermost exit. Per-attempt span behaviour is unaffected. -_OPERATION_STARTED_KEY: str = "tracing_operation_started" + +class OperationTracingPolicy(Policy): + """Emit the per-operation ``HttpTracer`` lifecycle around the whole call. + + Placed at `Stage.OPERATION`, outside the redirect and retry wrappers, so + its single ``send`` brackets every hop and attempt. It emits + ``operation_started`` before dispatching the chain and exactly one of + ``operation_succeeded`` / ``operation_failed`` once the chain unwinds, so + the operation outcome reflects what the caller actually observes rather + than the result of the first attempt. + + The per-operation ``HttpTracer`` is shared with `TracingPolicy` and the + redirect / retry policies via ``resolve_http_tracer`` (cached in + ``ctx.data``). Disable per-call by setting + ``ctx.options["tracing_enabled"] = False``. + """ + + STAGE = Stage.OPERATION + __slots__ = () + + def send(self, request: Request, ctx: PipelineContext) -> Response: + if not ctx.options.get("tracing_enabled", True): + return self.next.send(request, ctx) + http_tracer = resolve_http_tracer(ctx) + http_tracer.operation_started() + try: + response = self.next.send(request, ctx) + except BaseException as err: + http_tracer.operation_failed(err) + raise + http_tracer.operation_succeeded() + return response class TracingPolicy(Policy): - """Wrap each request in a tracing span. + """Wrap each attempt in a tracing span and emit per-request events. - Span attributes follow OpenTelemetry semantic conventions: + Re-entered once per retry attempt / redirect hop (it sits *inside* those + wrappers), so it opens one span per attempt. Span attributes follow + OpenTelemetry semantic conventions: - ``http.request.method``: HTTP method. - ``url.full``: Full URL (no redaction — install a separate redactor @@ -60,10 +98,14 @@ class TracingPolicy(Policy): ``ctx.data["retry_count"]`` (when retry policy is upstream). While the span is open the active trace/span ids are bound into the - correlation ``contextvars`` so downstream log records carry them, and a + correlation ``contextvars`` so downstream log records carry them, and the per-operation ``HttpTracer`` (from the call's - ``instrumentation_context.http_tracer_factory``) receives the - operation/request/response lifecycle events. + ``instrumentation_context.http_tracer_factory``) receives the per-request + events (``request_sent``, ``response_headers_received``, + ``response_received``). The operation-level lifecycle + (``operation_started`` / ``operation_succeeded`` / ``operation_failed``) + is emitted by `OperationTracingPolicy`, which brackets the whole call from + outside the retry / redirect wrappers. Disable per-call by setting ``ctx.options["tracing_enabled"] = False``. """ @@ -78,20 +120,13 @@ def send(self, request: Request, ctx: PipelineContext) -> Response: if not ctx.options.get("tracing_enabled", True): return self.next.send(request, ctx) parent = ctx.call.instrumentation_context - # Share one per-operation tracer with the redirect / retry policies via - # ``ctx.data`` (whichever policy runs first mints it). + # Share one per-operation tracer with the operation / redirect / retry + # policies via ``ctx.data`` (whichever policy runs first mints it). http_tracer = resolve_http_tracer(ctx) span = self._tracer.start_span(f"HTTP {request.method}", parent=parent) _set_request_attributes(span, request) - # ``operation_started`` fires once per operation. Because this policy is - # re-entered per retry attempt / redirect hop, only the outermost entry - # (the one that mints the flag) emits the operation lifecycle events. - is_outermost = _OPERATION_STARTED_KEY not in ctx.data - if is_outermost: - ctx.data[_OPERATION_STARTED_KEY] = True - http_tracer.operation_started() with bind_correlation(trace_id=_trace_id(span), span_id=_span_id(span)): - return self._dispatch(request, ctx, span, http_tracer, is_outermost) + return self._dispatch(request, ctx, span, http_tracer) def _dispatch( self, @@ -99,14 +134,12 @@ def _dispatch( ctx: PipelineContext, span: Span, http_tracer: HttpTracer, - is_outermost: bool, ) -> Response: - """Run the downstream chain, emitting tracer events around it. + """Run the downstream chain, emitting per-attempt tracer events around it. - The per-attempt span is opened and closed on every entry, but the - operation-level ``operation_succeeded`` / ``operation_failed`` events - fire only when the outermost entry unwinds (``is_outermost``), so a - retried or redirected call reports a single operation outcome. + The span and the ``request_sent`` / ``response_*`` events fire on every + entry (once per attempt / hop); the operation-level outcome is reported + separately by `OperationTracingPolicy`. """ _notify_request_sent(http_tracer, request) try: @@ -115,8 +148,6 @@ def _dispatch( except BaseException as err: span.set_error(type(err).__name__) span.end(error=err) - if is_outermost: - http_tracer.operation_failed(err) raise _notify_response(http_tracer, response) span.set_attribute("http.response.status_code", int(response.status)) @@ -124,8 +155,6 @@ def _dispatch( if isinstance(retry_count, int) and retry_count > 0: span.set_attribute("http.request.resend_count", retry_count) span.end() - if is_outermost: - http_tracer.operation_succeeded() return response @@ -182,4 +211,4 @@ def _split_host(url: Url) -> tuple[str | None, int | None]: return url.host or None, url.port -__all__ = ["TracingPolicy"] +__all__ = ["OperationTracingPolicy", "TracingPolicy"] diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py index 174b4e8..bb2f530 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py @@ -21,8 +21,13 @@ class of bugs where retry runs before redirect or auth runs after logging. class Stage(IntEnum): """Pipeline stage ordering. Lower value runs first (closer to caller entry). - Stages divide into three groups: + Stages divide into four groups: + - **Operation** (`OPERATION`): runs *outside* the redirect / retry + wrappers, so a single entry brackets the whole operation regardless of + how many hops or attempts happen inside. This is where per-operation + lifecycle observation (e.g. `OperationTracingPolicy`) belongs — events + that must fire exactly once and reflect the final outcome. - **Wrapping** (`REDIRECT`, `RETRY`): re-invoke the downstream chain per hop / attempt. Their pillar slot is reserved for the single redirect / retry policy. `POST_*` siblings run *inside* the wrapper's loop. @@ -37,6 +42,8 @@ class Stage(IntEnum): terminal transport call (`SEND` — never a user-step slot). """ + OPERATION = 50 + REDIRECT = 100 POST_REDIRECT = 150 RETRY = 200 @@ -63,7 +70,15 @@ def is_pillar(self) -> bool: _PILLARS: Final[frozenset[Stage]] = frozenset( - {Stage.REDIRECT, Stage.RETRY, Stage.AUTH, Stage.LOGGING, Stage.SERDE, Stage.SEND} + { + Stage.OPERATION, + Stage.REDIRECT, + Stage.RETRY, + Stage.AUTH, + Stage.LOGGING, + Stage.SERDE, + Stage.SEND, + } ) diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py b/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py index 054e455..a2f508c 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py @@ -54,9 +54,11 @@ def test_default_pipeline_returns_builder() -> None: def test_default_pipeline_wires_canonical_stack() -> None: pipeline = default_pipeline(_StubTransport()).build() stages = _stages_of(pipeline) - # Canonical order: REDIRECT, POST_REDIRECT (idempotency), RETRY, - # POST_RETRY (set-date then client-identity), LOGGING, POST_LOGGING (tracing). + # Canonical order: OPERATION (operation-tracing, outside the wrappers), + # REDIRECT, POST_REDIRECT (idempotency), RETRY, POST_RETRY (set-date then + # client-identity), LOGGING, POST_LOGGING (tracing). assert stages == [ + Stage.OPERATION, Stage.REDIRECT, Stage.POST_REDIRECT, Stage.RETRY, diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_retry_tuning.py b/packages/dexpace-sdk-core/tests/pipeline/test_retry_tuning.py index 26d23b6..4bcccd7 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_retry_tuning.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_retry_tuning.py @@ -32,7 +32,11 @@ from dexpace.sdk.core.instrumentation.http_tracer import HttpTracer, HttpTracerFactory from dexpace.sdk.core.instrumentation.noop import NOOP_SPAN from dexpace.sdk.core.pipeline import Pipeline -from dexpace.sdk.core.pipeline.policies import RetryPolicy, TracingPolicy +from dexpace.sdk.core.pipeline.policies import ( + OperationTracingPolicy, + RetryPolicy, + TracingPolicy, +) from dexpace.sdk.core.pipeline.policies.retry import ( _parse_rate_limit_reset, _StatusRetryError, @@ -341,15 +345,20 @@ def create(self) -> HttpTracer: class TestSharedTracerAcrossPolicies: def test_retry_and_tracing_share_one_per_operation_tracer(self) -> None: # With a factory that mints a fresh tracer per ``create`` (the - # documented contract), the retry and tracing policies must still land - # on a single per-operation instance via the ``ctx.data`` cache — - # otherwise attempt events and lifecycle events split across objects. + # documented contract), the operation-tracing, tracing, and retry + # policies must still land on a single per-operation instance via the + # ``ctx.data`` cache — otherwise attempt events and lifecycle events + # split across objects. factory = _CountingFactory() clock = FakeClock() client = _ScriptedClient([Status.SERVICE_UNAVAILABLE, Status.OK]) with Pipeline( client, - policies=[TracingPolicy(), RetryPolicy(clock=clock, rand=_FixedRandom(0.5))], + policies=[ + OperationTracingPolicy(), + TracingPolicy(), + RetryPolicy(clock=clock, rand=_FixedRandom(0.5)), + ], ) as p: p.run(_get(), DispatchContext(_instr("c" * 16, factory))) assert len(factory.created) == 1 diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_stage.py b/packages/dexpace-sdk-core/tests/pipeline/test_stage.py index 5a1f712..43a29a1 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_stage.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_stage.py @@ -15,7 +15,15 @@ class TestStage: @pytest.mark.parametrize( "stage", - [Stage.REDIRECT, Stage.RETRY, Stage.AUTH, Stage.LOGGING, Stage.SERDE, Stage.SEND], + [ + Stage.OPERATION, + Stage.REDIRECT, + Stage.RETRY, + Stage.AUTH, + Stage.LOGGING, + Stage.SERDE, + Stage.SEND, + ], ) def test_pillar_stages(self, stage: Stage) -> None: assert stage.is_pillar @@ -38,7 +46,8 @@ def test_non_pillar_stages(self, stage: Stage) -> None: assert not stage.is_pillar def test_stage_order(self) -> None: - assert Stage.REDIRECT < Stage.RETRY < Stage.AUTH < Stage.LOGGING < Stage.SEND + assert Stage.OPERATION < Stage.REDIRECT < Stage.RETRY < Stage.AUTH + assert Stage.AUTH < Stage.LOGGING < Stage.SEND class TestSyncPolicyEnforcement: @@ -123,6 +132,11 @@ def test_tracing_policy(self) -> None: assert TracingPolicy.STAGE is Stage.POST_LOGGING + def test_operation_tracing_policy(self) -> None: + from dexpace.sdk.core.pipeline.policies.tracing_policy import OperationTracingPolicy + + assert OperationTracingPolicy.STAGE is Stage.OPERATION + def test_bearer_token_policy(self) -> None: from dexpace.sdk.core.http.auth.policies import BearerTokenPolicy diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py index 54704bd..1dd4e66 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py @@ -34,7 +34,11 @@ ) from dexpace.sdk.core.instrumentation.noop import NOOP_SPAN from dexpace.sdk.core.pipeline import AsyncPipeline, Pipeline -from dexpace.sdk.core.pipeline.policies import RetryPolicy, TracingPolicy +from dexpace.sdk.core.pipeline.policies import ( + OperationTracingPolicy, + RetryPolicy, + TracingPolicy, +) from dexpace.sdk.core.pipeline.policies.async_redirect import AsyncRedirectPolicy from dexpace.sdk.core.pipeline.policies.redirect import RedirectPolicy @@ -131,7 +135,10 @@ def test_emits_lifecycle_events_in_order_on_success(self) -> None: tracer = _RecordingHttpTracer() instr = _instr(tracer) body = ResponseBody.from_bytes(b"hello world") - with Pipeline(_OkClient(body=body), policies=[TracingPolicy()]) as p: + with Pipeline( + _OkClient(body=body), + policies=[OperationTracingPolicy(), TracingPolicy()], + ) as p: p.run(_request(), DispatchContext(instr)) assert tracer.names() == [ "operation_started", @@ -184,7 +191,10 @@ def test_emits_operation_failed_on_exception(self) -> None: instr = _instr(tracer) boom = ServiceRequestError("connect failed") raised: BaseException | None = None - with Pipeline(_OkClient(raise_exc=boom), policies=[TracingPolicy()]) as p: + with Pipeline( + _OkClient(raise_exc=boom), + policies=[OperationTracingPolicy(), TracingPolicy()], + ) as p: try: p.run(_request(), DispatchContext(instr)) except ServiceRequestError as err: @@ -197,7 +207,10 @@ def test_emits_operation_failed_on_exception(self) -> None: def test_no_events_when_tracing_disabled(self) -> None: tracer = _RecordingHttpTracer() instr = _instr(tracer) - with Pipeline(_OkClient(), policies=[TracingPolicy()]) as p: + with Pipeline( + _OkClient(), + policies=[OperationTracingPolicy(), TracingPolicy()], + ) as p: p.run(_request(), DispatchContext(instr), tracing_enabled=False) assert tracer.events == [] @@ -303,7 +316,7 @@ def test_redirect_and_tracing_share_one_tracer(self) -> None: ) with Pipeline( client, - policies=[RedirectPolicy(), TracingPolicy()], + policies=[OperationTracingPolicy(), RedirectPolicy(), TracingPolicy()], ) as p: p.run(_request("https://api.example.com/start"), DispatchContext(instr)) names = tracer.names() @@ -345,6 +358,41 @@ def execute(self, request: Request) -> Response: raise self._error +class _RaiseThenOkClient(HttpClient): + """Raises a retryable error on the first ``fail_count`` calls, then OK.""" + + def __init__(self, *, error: BaseException, fail_count: int) -> None: + self._error = error + self._fail_count = fail_count + self.calls = 0 + + def execute(self, request: Request) -> Response: + self.calls += 1 + if self.calls <= self._fail_count: + raise self._error + return Response(request=request, protocol=Protocol.HTTP_1_1, status=Status.OK) + + +class _RedirectThenRaiseClient(HttpClient): + """Returns a redirect, then raises on the reissued (post-redirect) request.""" + + def __init__(self, *, location: str, error: BaseException) -> None: + self._location = location + self._error = error + self.calls = 0 + + def execute(self, request: Request) -> Response: + self.calls += 1 + if self.calls == 1: + response = Response( + request=request, + protocol=Protocol.HTTP_1_1, + status=Status.MOVED_PERMANENTLY, + ) + return response.with_header("Location", self._location) + raise self._error + + class TestOperationEventsFireOncePerOperation: def test_retry_emits_operation_lifecycle_once_across_attempts(self) -> None: # TracingPolicy sits *inside* RetryPolicy (retry is outer), so it is @@ -354,7 +402,10 @@ def test_retry_emits_operation_lifecycle_once_across_attempts(self) -> None: instr = _instr(tracer) client = _RetryThenOkClient(fail_status=Status.SERVICE_UNAVAILABLE, fail_count=2) retry = RetryPolicy(status_retries=3, clock=FakeClock()) - with Pipeline(client, policies=[retry, TracingPolicy()]) as p: + with Pipeline( + client, + policies=[OperationTracingPolicy(), retry, TracingPolicy()], + ) as p: p.run(_request(), DispatchContext(instr)) names = tracer.names() # Three attempts total (two 503s then a 200). @@ -376,7 +427,10 @@ def test_retry_exhaustion_emits_operation_failed_once(self) -> None: client = _AlwaysRaisesClient(boom) retry = RetryPolicy(connect_retries=2, total_retries=2, clock=FakeClock()) raised: BaseException | None = None - with Pipeline(client, policies=[retry, TracingPolicy()]) as p: + with Pipeline( + client, + policies=[OperationTracingPolicy(), retry, TracingPolicy()], + ) as p: try: p.run(_request(), DispatchContext(instr)) except ServiceRequestError as err: @@ -400,7 +454,10 @@ def test_redirect_emits_operation_lifecycle_once_across_hops(self) -> None: _Hop(Status.OK), ], ) - with Pipeline(client, policies=[RedirectPolicy(), TracingPolicy()]) as p: + with Pipeline( + client, + policies=[OperationTracingPolicy(), RedirectPolicy(), TracingPolicy()], + ) as p: p.run(_request("https://api.example.com/start"), DispatchContext(instr)) names = tracer.names() # Two hops -> two attempts through the inner TracingPolicy. @@ -413,6 +470,52 @@ def test_redirect_emits_operation_lifecycle_once_across_hops(self) -> None: op_events = [name for name in names if name.startswith("operation_")] assert op_events == ["operation_started", "operation_succeeded"] + def test_retry_then_success_reports_operation_succeeded(self) -> None: + # A call that fails on its first attempt and succeeds on a retry must + # report a single operation_succeeded reflecting the final outcome — + # never operation_failed for the discarded first attempt. + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + client = _RaiseThenOkClient(error=ServiceRequestError("connect failed"), fail_count=1) + retry = RetryPolicy(connect_retries=3, total_retries=3, clock=FakeClock()) + with Pipeline( + client, + policies=[OperationTracingPolicy(), retry, TracingPolicy()], + ) as p: + response = p.run(_request(), DispatchContext(instr)) + assert int(response.status) == 200 + assert client.calls == 2 + names = tracer.names() + assert names.count("operation_started") == 1 + assert names.count("operation_succeeded") == 1 + assert names.count("operation_failed") == 0 + # request_sent still fires per attempt (the failed one and the retry). + assert names.count("request_sent") == 2 + + def test_redirect_then_failure_reports_operation_failed(self) -> None: + # When a later redirect hop fails, the operation outcome is the failure + # that escapes — not the success of the earlier 3xx hop. + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + boom = ServiceRequestError("connect failed") + client = _RedirectThenRaiseClient(location="https://api.example.com/new", error=boom) + raised: BaseException | None = None + with Pipeline( + client, + policies=[OperationTracingPolicy(), RedirectPolicy(), TracingPolicy()], + ) as p: + try: + p.run(_request("https://api.example.com/start"), DispatchContext(instr)) + except ServiceRequestError as err: + raised = err + assert raised is boom + names = tracer.names() + assert names.count("operation_started") == 1 + assert names.count("operation_failed") == 1 + assert names.count("operation_succeeded") == 0 + failed = [payload for name, payload in tracer.events if name == "operation_failed"] + assert failed == [boom] + # ----- request_sent fires for unknown-length bodies (L19) ----------------- diff --git a/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/asyncio_http_client.py b/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/asyncio_http_client.py index a8fd216..cda3831 100644 --- a/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/asyncio_http_client.py +++ b/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/asyncio_http_client.py @@ -335,7 +335,7 @@ def _parse_status(code: str) -> Status: try: return Status(int(code)) except ValueError as err: - raise ServiceResponseError(f"Unknown status code: {code}", error=err) from err + raise ServiceResponseError(f"Invalid status code: {code}", error=err) from err def _drain_body(body: RequestBody) -> bytes: @@ -360,8 +360,11 @@ def _is_chunked(headers: Headers) -> bool: Inspects every ``Transfer-Encoding`` line — the header may be split across multiple lines (e.g. ``gzip`` then ``chunked``), so reading only the first value would miss a chunked coding that is not listed first and then parse - chunk-framing bytes as the body. Per RFC 9112 §6.1 a response advertising - chunked framing cannot be read as a fixed-length body. + chunk-framing bytes as the body. Each line is a comma-separated coding + list; the coding name (the token before any ``;`` parameters) is matched + exactly, so a value that merely contains the substring ``chunked`` (e.g. an + ``x-chunked`` coding name) does not trip the check. Per RFC 9112 §6.1 a + response advertising chunked framing cannot be read as a fixed-length body. Args: headers: The parsed response headers. @@ -369,7 +372,11 @@ def _is_chunked(headers: Headers) -> bool: Returns: ``True`` if any ``Transfer-Encoding`` value names ``chunked``. """ - return any("chunked" in value.lower() for value in headers.values("Transfer-Encoding")) + return any( + coding.split(";")[0].strip().lower() == "chunked" + for value in headers.values("Transfer-Encoding") + for coding in value.split(",") + ) def _content_length(headers: Headers) -> int | None: diff --git a/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/urllib_http_client.py b/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/urllib_http_client.py index c587dba..c3be907 100644 --- a/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/urllib_http_client.py +++ b/packages/dexpace-sdk-http-stdlib/src/dexpace/sdk/http/stdlib/urllib_http_client.py @@ -217,7 +217,7 @@ def _build_response(request: Request, opened: object) -> Response: # ``Status`` synthesizes a member for any in-range code. Release the # underlying response first so the connection is not leaked. _close_quietly(opened) - raise ServiceResponseError(f"Unknown status code: {status_code}", error=err) from err + raise ServiceResponseError(f"Invalid status code: {status_code}", error=err) from err raw_headers = getattr(opened, "headers", None) headers = _convert_headers(raw_headers) content_length = _body_content_length(headers) @@ -254,18 +254,20 @@ def _protocol_of(opened: object) -> Protocol: def _body_content_length(headers: Headers) -> int: """Resolve the body length to advertise on the ``ResponseBody``. - Returns ``-1`` (unknown) when ``Content-Encoding`` is present, because - the stream yields decompressed bytes and the upstream ``Content-Length`` - counts the compressed payload — propagating it would lie about the body. + ``http.client`` does not decode ``Content-Encoding`` (gzip/deflate): the + stream yields the bytes exactly as framed on the wire, so a present + ``Content-Length`` matches the body this transport exposes and is reported + as-is — even under ``Content-Encoding``. (Transports that decompress + transparently, e.g. requests/httpx/aiohttp, must instead drop the header + there, since it would then describe the compressed payload rather than the + decoded stream they hand back.) Args: headers: The response headers. Returns: - The body length in bytes, or ``-1`` when unknown or unreliable. + The body length in bytes, or ``-1`` when absent or unparseable. """ - if "content-encoding" in headers: - return -1 raw = headers.get("Content-Length") if raw is None: return -1 diff --git a/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py b/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py index acb1458..edbac8f 100644 --- a/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py +++ b/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py @@ -17,10 +17,12 @@ ServiceResponseTimeoutError, ) from dexpace.sdk.core.http.common import Url +from dexpace.sdk.core.http.common.headers import Headers from dexpace.sdk.core.http.common.protocol import Protocol from dexpace.sdk.core.http.request import Method, Request, RequestBody from dexpace.sdk.core.http.response import Status from dexpace.sdk.http.stdlib import AsyncioHttpClient +from dexpace.sdk.http.stdlib import asyncio_http_client as _asyncio_mod _Handler = Callable[[asyncio.StreamReader, asyncio.StreamWriter], Awaitable[None]] @@ -359,6 +361,21 @@ async def multiline_te(reader: asyncio.StreamReader, writer: asyncio.StreamWrite await anext(gen) +def test_is_chunked_matches_coding_token_not_substring() -> None: + # The chunked check matches the coding token exactly: a real ``chunked`` + # coding (alone, after other codings, or with parameters) trips it, but a + # coding name that merely contains the substring ``chunked`` (``x-chunked``) + # does not — so a benign coding is never mistaken for chunked framing. + assert _asyncio_mod._is_chunked(Headers([("Transfer-Encoding", "chunked")])) + assert _asyncio_mod._is_chunked(Headers([("Transfer-Encoding", "gzip, chunked")])) + assert _asyncio_mod._is_chunked( + Headers([("Transfer-Encoding", "gzip"), ("Transfer-Encoding", "chunked")]) + ) + assert _asyncio_mod._is_chunked(Headers([("Transfer-Encoding", "chunked ; foo=bar")])) + assert not _asyncio_mod._is_chunked(Headers([("Transfer-Encoding", "x-chunked")])) + assert not _asyncio_mod._is_chunked(Headers([("Transfer-Encoding", "gzip")])) + + async def test_connection_close_framed_body_read_to_eof() -> None: # M3: a response without Content-Length is connection-close framed; the # body must be read to EOF, not fabricated as empty. diff --git a/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py b/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py index 90469a4..0f57e9e 100644 --- a/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py +++ b/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py @@ -322,19 +322,21 @@ def test_unknown_protocol_version_defaults_to_http_1_1() -> None: assert response.protocol is Protocol.HTTP_1_1 -def test_content_length_dropped_when_content_encoding_present() -> None: - # L2: the stream yields decompressed bytes, so the upstream - # Content-Length (compressed size) must not be propagated to the body. +def test_content_length_surfaced_under_content_encoding() -> None: + # http.client does not decode Content-Encoding, so the body urllib serves + # is the wire payload whose length is the upstream Content-Length: the + # header is accurate and is surfaced as-is (unlike the decompressing + # requests / httpx / aiohttp adapters, which must drop it). opened = _TrackingResponse( 200, headers=[("Content-Length", "9"), ("Content-Encoding", "gzip")], - payload=b"decoded", + payload=b"compressed", ) request = Request(method=Method.GET, url=Url.parse("http://127.0.0.1/")) response = _urllib_mod._build_response(request, opened) body = response.body assert body is not None - assert body.content_length() == -1 + assert body.content_length() == 9 def test_read_failure_maps_to_service_response_error() -> None: diff --git a/tools/surface_baseline.json b/tools/surface_baseline.json index 085e82c..c93c6ae 100644 --- a/tools/surface_baseline.json +++ b/tools/surface_baseline.json @@ -1225,6 +1225,14 @@ } }, "dexpace.sdk.core.pipeline.policies.tracing_policy": { + "OperationTracingPolicy": { + "bases": [ + "Policy" + ], + "methods": { + "send": "send(self, request: Request, ctx: PipelineContext) -> Response" + } + }, "TracingPolicy": { "bases": [ "Policy" @@ -1599,6 +1607,7 @@ "ClientIdentityPolicy", "IdempotencyPolicy", "LoggingPolicy", + "OperationTracingPolicy", "RedirectPolicy", "RequestHistory", "RetryMode", From 901aad378e478aa4e42e52b0c677469d1aa73a3a Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 10 Jun 2026 20:11:43 +0300 Subject: [PATCH 2/6] docs: changelog and docstrings for tracing split and transport fixes - Add CHANGELOG entries for OperationTracingPolicy and the new outermost Stage.OPERATION, the TracingPolicy lifecycle split (with a migration note for pipelines assembled by hand), and the urllib/asyncio transport corrections (Content-Length under Content-Encoding, chunked-framing detection, and out-of-range status wording). - Document that the tracing and logging policies are sync-only and the async pipeline carries no tracing, in both the tracing module docstring and the changelog's scope boundaries. - List Stage.OPERATION among the pillar stages in the staged-builder docstring. - Remove leftover shorthand prefixes from a few test comments, keeping the explanatory text. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 57 ++++++++++++++++--- .../core/pipeline/policies/tracing_policy.py | 4 ++ .../sdk/core/pipeline/staged_builder.py | 4 +- .../tests/pipeline/test_tracer_emission.py | 2 +- .../tests/test_asyncio_http_client.py | 12 ++-- .../tests/test_urllib_http_client.py | 6 +- 6 files changed, 66 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 844414b..9bc627d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,13 +8,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] A round of platform improvements to `dexpace-sdk-core`: new optional building -blocks (typed serialization, webhook verification, pagination, two pipeline -policies), tightened retry and tracing behaviour, and a batch of correctness -fixes across bodies, SSE parsing, Digest auth, and error reporting. Most of this -lands in `core`; the transport adapters additionally get consistent connect- vs -read-phase timeout classification and tighter resource release. The only removed -public symbol is the unused `RetryConfig` (see Removed); existing code otherwise -continues to work without modification. +blocks (typed serialization, webhook verification, pagination, three pipeline +policies), tightened retry behaviour, a corrected per-operation tracing +lifecycle, and a batch of correctness fixes across bodies, SSE parsing, Digest +auth, and error reporting. Most of this lands in `core`; the transport adapters +additionally get consistent connect- vs read-phase timeout classification, +tighter resource release, and a set of edge-case corrections (status-code +reporting, chunked-framing detection, and content-length under content-encoding). +The only removed public symbol is the unused `RetryConfig` (see Removed); +existing code otherwise continues to work without modification — with one +behavioural note for hand-assembled pipelines (see *Tracing lifecycle* under +Changed). ### Added @@ -37,6 +41,13 @@ continues to work without modification. - **Client-identity policy** (`pipeline.policies.client_identity`, plus its async twin). Sets a consistent `User-Agent` / client-identity header derived from the configured application id and SDK version. +- **Per-operation tracing policy** (`OperationTracingPolicy` in + `pipeline.policies.tracing_policy`, with a new outermost `Stage.OPERATION`). + Emits the per-operation `HttpTracer` lifecycle (`operation_started`, then + exactly one `operation_succeeded` / `operation_failed`) from outside the retry + and redirect wrappers, so the reported outcome reflects the final result of + the whole call rather than a single attempt or hop. Sync-only, in line with + the rest of the tracing stack; the async pipeline carries no tracing. - **HTTP tracer** (`instrumentation.http_tracer`). An adapter-style tracer base whose per-event methods default to no-ops, so a subclass overrides only the events it cares about. Wired through the tracing policy for span emission. @@ -58,6 +69,15 @@ continues to work without modification. cancellation cleanly between attempts. - **Tracing and redirect policies** now emit tracer events and carry correlation through redirects, with credentials stripped on cross-origin redirects. +- **Tracing lifecycle** (`pipeline.policies.tracing_policy`). The per-operation + `HttpTracer` lifecycle moved out of `TracingPolicy` into the new + `OperationTracingPolicy`; `TracingPolicy` now emits only its per-attempt span + and the per-request events (`request_sent`, `response_headers_received`, + `response_received`). `default_pipeline` wires both, so callers who use it are + unaffected. A pipeline assembled by hand that wants the operation lifecycle + must now add `OperationTracingPolicy` alongside `TracingPolicy` — a bare + `TracingPolicy` no longer emits `operation_started` / `operation_succeeded` / + `operation_failed`. - **Default pipelines** (`pipeline.defaults`). The standard sync/async stacks now assemble the new idempotency and client-identity policies alongside the existing retry, redirect, logging, and tracing policies. @@ -96,6 +116,26 @@ continues to work without modification. `async_response_body`). Cancelling an in-flight read now releases the underlying resources instead of leaking them, and re-raises `CancelledError` after cleanup. +- **Per-operation tracing outcome** (`pipeline.policies.tracing_policy`). A call + retried after a failed first attempt no longer reports `operation_failed` for + the discarded attempt (it reports the single `operation_succeeded` it ends on), + and a redirect whose later hop fails no longer reports `operation_succeeded` + for the earlier 3xx hop. The lifecycle now fires exactly once and reflects the + final outcome. See *Tracing lifecycle* under Changed for the API shape. +- **`Content-Length` under `Content-Encoding`** (`http.stdlib.urllib_http_client`). + `UrllibHttpClient` no longer drops a valid `Content-Length` when + `Content-Encoding` is present: `http.client` does not decode content codings, + so the body it serves is the wire payload whose length the header describes, + and the length is now surfaced as-is. (The decompressing requests/httpx/aiohttp + adapters still drop it, since they hand back a decoded stream.) +- **Chunked-framing detection** (`http.stdlib.asyncio_http_client`). The + `Transfer-Encoding` check matches the `chunked` coding by token rather than + substring, so a coding whose name merely contains `chunked` (e.g. `x-chunked`) + is no longer mistaken for chunked framing. +- **Out-of-range status reporting** (`http.stdlib.urllib_http_client`, + `asyncio_http_client`). Both now raise a `ServiceResponseError` worded + `Invalid status code: …` for a status outside 100–599, matching the other + adapters. ### Verified @@ -113,6 +153,9 @@ The following were intentionally left out of this round and are **not** included errors themselves. - **`sendfile` fast-path** — file bodies are streamed via the existing `iter_bytes` path; no zero-copy `sendfile` transport optimisation was added. +- **Async tracing / logging** — the tracing and logging policies (including the + new `OperationTracingPolicy`) ship sync-only; `default_async_pipeline` carries + no tracing, and async callers handle per-operation observability themselves. - **MCP support** — no Model Context Protocol integration is included. - **Java SDK items** — the Java counterpart lives in a separate repository and was out of scope here. diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py index 88795f0..b96f3da 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py @@ -29,6 +29,10 @@ default tracer factory returns ``NOOP_HTTP_TRACER`` and the no-op span carries the sentinel trace ids. Disable either per-call by setting ``ctx.options["tracing_enabled"] = False``. + +Both policies are sync-only, in line with the rest of the tracing and logging +stack; ``default_async_pipeline`` carries no tracing, so async callers own +per-operation observability on their side. """ from __future__ import annotations diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/staged_builder.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/staged_builder.py index 1be7c36..3590cf2 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/staged_builder.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/staged_builder.py @@ -7,8 +7,8 @@ constructor. Policies declare their ``STAGE``; the builder slots them into stage buckets and at ``build()`` time flattens to a list in stage order. -Pillar stages (`REDIRECT`, `RETRY`, `AUTH`, `LOGGING`, `SERDE`) admit at -most one policy. A second `append` of a pillar raises by default — use +Pillar stages (`OPERATION`, `REDIRECT`, `RETRY`, `AUTH`, `LOGGING`, `SERDE`) +admit at most one policy. A second `append` of a pillar raises by default — use ``replace(target, new)`` for explicit swaps or ``append(p, force=True)`` for the rare legitimate use case (test fixtures, runtime composition). """ diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py index 1dd4e66..814ebef 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py @@ -517,7 +517,7 @@ def test_redirect_then_failure_reports_operation_failed(self) -> None: assert failed == [boom] -# ----- request_sent fires for unknown-length bodies (L19) ----------------- +# ----- request_sent fires for unknown-length bodies ----------------- class TestRequestSentUnknownLength: diff --git a/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py b/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py index edbac8f..8534017 100644 --- a/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py +++ b/packages/dexpace-sdk-http-stdlib/tests/test_asyncio_http_client.py @@ -226,7 +226,7 @@ def _header_value(head: list[str], name: str) -> str | None: async def test_host_header_includes_non_default_port() -> None: - # M1: a non-default port must appear in the Host header (RFC 9112 §3.2). + # A non-default port must appear in the Host header (RFC 9112 §3.2). sink: dict[str, list[str]] = {} gen = _serve(await _collect_head(sink)) base = await anext(gen) @@ -241,7 +241,7 @@ async def test_host_header_includes_non_default_port() -> None: async def test_empty_post_body_sends_content_length_zero() -> None: - # L17: a body-bearing method with no payload must advertise + # A body-bearing method with no payload must advertise # Content-Length: 0 (RFC 9110 §8.6). sink: dict[str, list[str]] = {} gen = _serve(await _collect_head(sink)) @@ -289,7 +289,7 @@ async def test_post_with_body_sets_content_length() -> None: async def test_plain_http_ignores_supplied_ssl_context() -> None: - # M2: a caller-supplied ssl_context must not trigger a TLS handshake on + # A caller-supplied ssl_context must not trigger a TLS handshake on # a plain http:// URL — the request must still succeed over plaintext. async def ok(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: await _read_request_head(reader) @@ -310,7 +310,7 @@ async def ok(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None async def test_chunked_response_raises_service_response_error() -> None: - # M3: a chunked response cannot be dechunked by this reference client, so + # A chunked response cannot be dechunked by this reference client, so # it must fail loudly rather than silently return an empty body. async def chunked(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: await _read_request_head(reader) @@ -333,7 +333,7 @@ async def chunked(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> async def test_multiline_transfer_encoding_with_chunked_not_first_raises() -> None: - # M3: Transfer-Encoding may be split across lines with ``chunked`` NOT + # Transfer-Encoding may be split across lines with ``chunked`` NOT # first (alongside a misleading Content-Length). The client must still # detect chunked framing and refuse to read the bytes as a fixed-length # body, rather than parsing chunk framing as the payload — the exact @@ -377,7 +377,7 @@ def test_is_chunked_matches_coding_token_not_substring() -> None: async def test_connection_close_framed_body_read_to_eof() -> None: - # M3: a response without Content-Length is connection-close framed; the + # A response without Content-Length is connection-close framed; the # body must be read to EOF, not fabricated as empty. async def close_framed(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: await _read_request_head(reader) diff --git a/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py b/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py index 0f57e9e..db195ab 100644 --- a/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py +++ b/packages/dexpace-sdk-http-stdlib/tests/test_urllib_http_client.py @@ -120,7 +120,7 @@ def test_post_with_body(echo_server: str) -> None: class _RedirectHandler(socketserver.StreamRequestHandler): """Replies with a 302 pointing at another origin and a small body. - Pins H1: the transport must NOT follow the redirect itself. If it did, + Pins that the transport must NOT follow the redirect itself. If it did, the second hop would fail (the target host is unroutable) or the response would carry the followed target's status — either way not a 302. """ @@ -158,7 +158,7 @@ def redirect_server() -> Iterator[str]: def test_redirect_is_not_followed(redirect_server: str) -> None: - # H1: a 302 must surface to the pipeline as a 302 Response, not be + # A 302 must surface to the pipeline as a 302 Response, not be # transparently followed by the transport (which would also leak the # Authorization header cross-origin). client = UrllibHttpClient(timeout=2.0) @@ -340,7 +340,7 @@ def test_content_length_surfaced_under_content_encoding() -> None: def test_read_failure_maps_to_service_response_error() -> None: - # M5: a read-phase failure on the raw HTTPResponse must surface as an + # A read-phase failure on the raw HTTPResponse must surface as an # SdkError, not a bare OSError / IncompleteRead. class _BoomResponse(_TrackingResponse): def read(self, size: int = -1) -> bytes: From e0a5af4e73a8bb23e5c91461e2bc44aa9f5bc69a Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 10 Jun 2026 20:21:33 +0300 Subject: [PATCH 3/6] docs: add the operation-tracing stage to the README pipeline overview The README architecture diagram, stage-spacing note, policy surface table, and observability summary predated the new outermost operation-tracing stage. - Show OPERATION as the outermost pillar in the request-flow diagram. - List OperationTracingPolicy in the pipeline.policies surface table and the observability feature summary. - Correct the stage-spacing wording (values are spaced out rather than a fixed 100 apart) in both the README and the Stage docstring, now that the operation stage sits at 50. Co-Authored-By: Claude Opus 4.8 --- README.md | 13 +++++++------ .../src/dexpace/sdk/core/pipeline/stage.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 7611fb2..183d11b 100644 --- a/README.md +++ b/README.md @@ -132,12 +132,12 @@ the way back up. The terminal policy hands the request to an `HttpClient` transport. ``` -caller → Pipeline → REDIRECT → POST_REDIRECT → RETRY → POST_RETRY → [AUTH] → LOGGING → POST_LOGGING → HttpClient → wire - (pillar) idempotency (pillar) set-date (pillar) (pillar) tracing - client-identity +caller → Pipeline → OPERATION → REDIRECT → POST_REDIRECT → RETRY → POST_RETRY → [AUTH] → LOGGING → POST_LOGGING → HttpClient → wire + (pillar) (pillar) idempotency (pillar) set-date (pillar) (pillar) tracing + client-identity ``` -Ordering is governed by `Stage`, an `IntEnum` whose values sit 100 apart so +Ordering is governed by `Stage`, an `IntEnum` whose values are spaced out so new stages can land without renumbering. Pillar stages admit a single policy; non-pillar stages stack with deque semantics. Callers who prefer explicit ordering can still use the list form, @@ -176,7 +176,7 @@ Bottom-up, the layers are: | `http.webhooks` | `WebhookVerifier`, `InvalidWebhookSignatureError` — HMAC signature verification with timestamp tolerance | | `pagination` | `Page`, `Paginator` / `AsyncPaginator`, `PaginationStrategy` (`CursorStrategy`, `PageNumberStrategy`, `LinkHeaderStrategy`) | | `pipeline` | `Pipeline`, `AsyncPipeline`, `Policy` ABC, `Stage` enum, `StagedPipelineBuilder`, `default_pipeline()` | -| `pipeline.policies` | `RedirectPolicy`, `IdempotencyPolicy`, `RetryPolicy`, `SetDatePolicy`, `ClientIdentityPolicy`, `LoggingPolicy`, `TracingPolicy` (async twins for all but logging/tracing) | +| `pipeline.policies` | `RedirectPolicy`, `IdempotencyPolicy`, `RetryPolicy`, `SetDatePolicy`, `ClientIdentityPolicy`, `LoggingPolicy`, `OperationTracingPolicy`, `TracingPolicy` (async twins for all but logging/tracing) | | `client` | `HttpClient` and `AsyncHttpClient` Protocols | | `serde` | `Serde`, `Serializer`, `Deserializer` Protocols + `JsonSerde` reference impl | | `instrumentation` | `ClientLogger`, `UrlRedactor`, `Tracer`, `Span`, `InstrumentationContext`, `contextvars` correlation helpers, noop singletons | @@ -205,7 +205,8 @@ Bottom-up, the layers are: reissue, userinfo dropped from `Location` URLs, configurable allowed methods and 303 handling. - **Observability.** Structured logging via `LoggingPolicy`, - OpenTelemetry-compatible spans via `TracingPolicy`, URL redaction with + per-attempt OpenTelemetry spans via `TracingPolicy` with a once-per-call + tracer lifecycle via `OperationTracingPolicy`, URL redaction with allowlisted query parameters, and capped body capture for diagnostics. - **Server-Sent Events.** A WHATWG-compliant `SseParser` with a bounded line buffer, plus reconnecting `SseConnection` / `AsyncSseConnection` diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py index bb2f530..d3e27d7 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/stage.py @@ -8,7 +8,7 @@ class of bugs where retry runs before redirect or auth runs after logging. Pillar stages admit at most one policy; non-pillar stages stack with -deque semantics. The numeric values are sparse (100 apart) so future +deque semantics. The numeric values are spaced out so future stages can slot between existing ones without renumbering. """ From bf090db94037c17d288c0803e45dda672632be08 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 10 Jun 2026 20:24:38 +0300 Subject: [PATCH 4/6] docs: reflect the operation-tracing stage in CLAUDE.md CLAUDE.md's pipeline overview listed the shipped policies and the default stack order without the new operation-tracing stage that brackets the redirect/retry wrappers. - Add operation-tracing to the shipped-policies list and note it ships sync-only alongside logging and tracing. - Put operation-tracing at the head of the documented default stack order. Co-Authored-By: Claude Opus 4.8 --- CLAUDE.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 4c01264..649f809 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -206,15 +206,16 @@ Layered, bottom-up: 4. **`pipeline`** — `Policy` (and `AsyncPolicy`) wrap the downstream chain; `Pipeline` / `AsyncPipeline` run an ordered set of policies grouped into `Stage`s via `StagedPipelineBuilder`. Shipped policies: redirect, - idempotency, retry, set-date, client-identity, logging, tracing. Async - twins under `pipeline/policies/` exist only for redirect, idempotency, - retry, set-date, and client-identity; logging and tracing are sync-only. + idempotency, retry, set-date, client-identity, logging, tracing, and + operation-tracing. Async twins under `pipeline/policies/` exist only for + redirect, idempotency, retry, set-date, and client-identity; logging, + tracing, and operation-tracing are sync-only. `default_pipeline()` / `default_async_pipeline()` assemble the standard - stack in the order redirect → idempotency → retry → set-date → - client-identity → [auth] → logging → tracing (the async pipeline omits - logging and tracing). The lower-level `pipeline/step/PipelineStep` Protocol - (`(input, context) -> output`) plus `StepMetadata` remain for custom - composition. + stack in the order operation-tracing → redirect → idempotency → retry → + set-date → client-identity → [auth] → logging → tracing (the async + pipeline omits the tracing and logging policies). The lower-level + `pipeline/step/PipelineStep` Protocol (`(input, context) -> output`) plus + `StepMetadata` remain for custom composition. 5. **`client/HttpClient`** — single-method Protocol (`execute(request) -> Response`). Transport is **not** provided by `core`; the `dexpace-sdk-http-*` packages (stdlib, httpx, aiohttp, requests) each From 8f6b8d06d125ee2439a50db0fd10ae701cf5d818 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 10 Jun 2026 20:57:21 +0300 Subject: [PATCH 5/6] feat(pipeline): complete the async HttpTracer lifecycle with AsyncOperationTracingPolicy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The async pipeline already emitted attempt-level HttpTracer events (AsyncRetryPolicy's attempt_started/attempt_failed and AsyncRedirectPolicy's request_url_resolved) but never the operation lifecycle, so an async operator saw attempts and redirect hops with no operation_started bracket and no final operation_succeeded/operation_failed. The per-operation lifecycle had been extracted into the sync-only OperationTracingPolicy without an async counterpart, leaving the async tracer stream permanently incomplete. The HttpTracer callbacks are synchronous and transport-agnostic by contract, so there is no async barrier to emitting them — only a missing policy. - Add AsyncOperationTracingPolicy (async twin of OperationTracingPolicy) at the outermost Stage.OPERATION; it awaits only the downstream send. - Wire it as the outermost policy in default_async_pipeline and export it from pipeline.policies. - Cover it: the lifecycle fires exactly once and reflects the final outcome across async retry (success-after-failure and exhaustion) and redirect (success and late-hop failure), the tracing-disabled path, and the default async stack wiring. - Regenerate the public API surface baseline. - Correct the docs that described the async stack as carrying no tracing (CHANGELOG, README, CLAUDE.md, tracing module docstring): only the per-attempt OpenTelemetry span policy and structured logging remain sync-only. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 25 ++- CLAUDE.md | 11 +- README.md | 2 +- .../src/dexpace/sdk/core/pipeline/defaults.py | 15 +- .../sdk/core/pipeline/policies/__init__.py | 2 + .../pipeline/policies/async_tracing_policy.py | 72 +++++++ .../core/pipeline/policies/tracing_policy.py | 7 +- .../tests/pipeline/test_defaults.py | 11 ++ .../tests/pipeline/test_tracer_emission.py | 178 ++++++++++++++++++ tools/surface_baseline.json | 11 ++ 10 files changed, 312 insertions(+), 22 deletions(-) create mode 100644 packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/async_tracing_policy.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc627d..3ab8b61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,13 +41,16 @@ Changed). - **Client-identity policy** (`pipeline.policies.client_identity`, plus its async twin). Sets a consistent `User-Agent` / client-identity header derived from the configured application id and SDK version. -- **Per-operation tracing policy** (`OperationTracingPolicy` in - `pipeline.policies.tracing_policy`, with a new outermost `Stage.OPERATION`). - Emits the per-operation `HttpTracer` lifecycle (`operation_started`, then - exactly one `operation_succeeded` / `operation_failed`) from outside the retry - and redirect wrappers, so the reported outcome reflects the final result of - the whole call rather than a single attempt or hop. Sync-only, in line with - the rest of the tracing stack; the async pipeline carries no tracing. +- **Per-operation tracing policy** (`OperationTracingPolicy` and its async twin + `AsyncOperationTracingPolicy`, with a new outermost `Stage.OPERATION`). Emits + the per-operation `HttpTracer` lifecycle (`operation_started`, then exactly + one `operation_succeeded` / `operation_failed`) from outside the retry and + redirect wrappers, so the reported outcome reflects the final result of the + whole call rather than a single attempt or hop. Both `default_pipeline` and + `default_async_pipeline` wire it, so the async stack now reports the same + lifecycle alongside the attempt-level events its retry / redirect policies + already emit. Only `TracingPolicy`'s per-attempt OpenTelemetry span policy + remains sync-only. - **HTTP tracer** (`instrumentation.http_tracer`). An adapter-style tracer base whose per-event methods default to no-ops, so a subclass overrides only the events it cares about. Wired through the tracing policy for span emission. @@ -153,9 +156,11 @@ The following were intentionally left out of this round and are **not** included errors themselves. - **`sendfile` fast-path** — file bodies are streamed via the existing `iter_bytes` path; no zero-copy `sendfile` transport optimisation was added. -- **Async tracing / logging** — the tracing and logging policies (including the - new `OperationTracingPolicy`) ship sync-only; `default_async_pipeline` carries - no tracing, and async callers handle per-operation observability themselves. +- **Async OpenTelemetry spans / logging** — the per-attempt span policy + (`TracingPolicy`) and `LoggingPolicy` ship sync-only, so + `default_async_pipeline` emits the per-operation `HttpTracer` lifecycle and + attempt-level events but no OpenTelemetry spans or structured request / + response logs. - **MCP support** — no Model Context Protocol integration is included. - **Java SDK items** — the Java counterpart lives in a separate repository and was out of scope here. diff --git a/CLAUDE.md b/CLAUDE.md index 649f809..c07f44a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -132,7 +132,7 @@ python-sdk/ │ │ │ │ │ │ │ ├── policies/ # redirect, idempotency, retry, set_date, │ │ │ │ # client_identity, logging, tracing - │ │ │ │ # (async twins only for the first five) + │ │ │ │ # (async twins for all but logging and per-attempt tracing) │ │ │ └── step/ # PipelineStep, StepMetadata │ │ ├── client/ # HttpClient + AsyncHttpClient Protocols │ │ ├── config/ # Configuration @@ -207,13 +207,14 @@ Layered, bottom-up: `Pipeline` / `AsyncPipeline` run an ordered set of policies grouped into `Stage`s via `StagedPipelineBuilder`. Shipped policies: redirect, idempotency, retry, set-date, client-identity, logging, tracing, and - operation-tracing. Async twins under `pipeline/policies/` exist only for - redirect, idempotency, retry, set-date, and client-identity; logging, - tracing, and operation-tracing are sync-only. + operation-tracing. Async twins exist for redirect, idempotency, retry, + set-date, client-identity, and operation-tracing; logging and the + per-attempt tracing policy are sync-only. `default_pipeline()` / `default_async_pipeline()` assemble the standard stack in the order operation-tracing → redirect → idempotency → retry → set-date → client-identity → [auth] → logging → tracing (the async - pipeline omits the tracing and logging policies). The lower-level + pipeline keeps operation-tracing but omits logging and the per-attempt + tracing span). The lower-level `pipeline/step/PipelineStep` Protocol (`(input, context) -> output`) plus `StepMetadata` remain for custom composition. 5. **`client/HttpClient`** — single-method Protocol diff --git a/README.md b/README.md index 183d11b..60f20c7 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,7 @@ Bottom-up, the layers are: | `http.webhooks` | `WebhookVerifier`, `InvalidWebhookSignatureError` — HMAC signature verification with timestamp tolerance | | `pagination` | `Page`, `Paginator` / `AsyncPaginator`, `PaginationStrategy` (`CursorStrategy`, `PageNumberStrategy`, `LinkHeaderStrategy`) | | `pipeline` | `Pipeline`, `AsyncPipeline`, `Policy` ABC, `Stage` enum, `StagedPipelineBuilder`, `default_pipeline()` | -| `pipeline.policies` | `RedirectPolicy`, `IdempotencyPolicy`, `RetryPolicy`, `SetDatePolicy`, `ClientIdentityPolicy`, `LoggingPolicy`, `OperationTracingPolicy`, `TracingPolicy` (async twins for all but logging/tracing) | +| `pipeline.policies` | `RedirectPolicy`, `IdempotencyPolicy`, `RetryPolicy`, `SetDatePolicy`, `ClientIdentityPolicy`, `LoggingPolicy`, `OperationTracingPolicy`, `TracingPolicy` (async twins for all but `LoggingPolicy` and `TracingPolicy`) | | `client` | `HttpClient` and `AsyncHttpClient` Protocols | | `serde` | `Serde`, `Serializer`, `Deserializer` Protocols + `JsonSerde` reference impl | | `instrumentation` | `ClientLogger`, `UrlRedactor`, `Tracer`, `Span`, `InstrumentationContext`, `contextvars` correlation helpers, noop singletons | diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py index f4be724..fcc3451 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/defaults.py @@ -13,6 +13,7 @@ from .policies.async_redirect import AsyncRedirectPolicy from .policies.async_retry import AsyncRetryPolicy from .policies.async_set_date import AsyncSetDatePolicy +from .policies.async_tracing_policy import AsyncOperationTracingPolicy from .policies.client_identity import ClientIdentityPolicy from .policies.idempotency import IdempotencyPolicy from .policies.logging_policy import LoggingPolicy @@ -106,11 +107,19 @@ def default_async_pipeline( ) -> AsyncStagedPipelineBuilder: """Async twin of `default_pipeline`. - Mirrors the sync version's stack minus logging/tracing, which currently - only ship as sync policies. Async-side observability lives on the caller's - side until async versions land. + Mirrors the sync version's stack. `AsyncOperationTracingPolicy` brackets + the whole operation from the outermost stage so the per-operation + ``HttpTracer`` lifecycle (``operation_started`` / ``operation_succeeded`` / + ``operation_failed``) fires once and reflects the final outcome — completing + the attempt-level events the async retry and redirect policies already emit + through the same per-operation tracer. The per-attempt OpenTelemetry span + policy (`TracingPolicy`) and `LoggingPolicy` ship sync-only, so the async + stack omits those two. """ builder = AsyncStagedPipelineBuilder(client) + # Sorts to Stage.OPERATION (outermost), bracketing every hop / attempt so + # the per-operation lifecycle fires once on the final outcome. + builder.append(AsyncOperationTracingPolicy()) builder.append(redirect or AsyncRedirectPolicy()) builder.append(idempotency or AsyncIdempotencyPolicy()) builder.append(retry or AsyncRetryPolicy()) diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py index 33a5375..d6d70e4 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/__init__.py @@ -11,6 +11,7 @@ from .async_redirect import AsyncRedirectPolicy from .async_retry import AsyncRetryPolicy from .async_set_date import AsyncSetDatePolicy +from .async_tracing_policy import AsyncOperationTracingPolicy from .client_identity import ClientIdentityPolicy, default_user_agent from .idempotency import IdempotencyPolicy from .logging_policy import LoggingPolicy @@ -22,6 +23,7 @@ __all__ = [ "AsyncClientIdentityPolicy", "AsyncIdempotencyPolicy", + "AsyncOperationTracingPolicy", "AsyncRedirectPolicy", "AsyncRetryPolicy", "AsyncSetDatePolicy", diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/async_tracing_policy.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/async_tracing_policy.py new file mode 100644 index 0000000..08b35e7 --- /dev/null +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/async_tracing_policy.py @@ -0,0 +1,72 @@ +# Copyright (c) 2026 dexpace and Omar Aljarrah. +# Licensed under the MIT License. See LICENSE.md in the repository root for details. + +"""Async twin of `OperationTracingPolicy`.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar, Literal + +from ..async_policy import AsyncPolicy +from ..stage import Stage +from .redirect import resolve_http_tracer + +if TYPE_CHECKING: + from ...http.request.request import Request + from ...http.response.async_response import AsyncResponse + from ..context import PipelineContext + + +class AsyncOperationTracingPolicy(AsyncPolicy): + """Async variant of `OperationTracingPolicy`. + + Emits the per-operation ``HttpTracer`` lifecycle around the whole async + call. Placed at `Stage.OPERATION`, outside the redirect and retry wrappers, + so its single ``send`` brackets every hop and attempt: it emits + ``operation_started`` before dispatching the chain and exactly one of + ``operation_succeeded`` / ``operation_failed`` once the chain unwinds, so + the operation outcome reflects what the caller observes rather than the + result of the first attempt. + + This completes the async ``HttpTracer`` lifecycle. `AsyncRetryPolicy` and + `AsyncRedirectPolicy` already emit the attempt-level events and + ``request_url_resolved`` through the same per-operation tracer (resolved + via ``resolve_http_tracer`` and cached in ``ctx.data``), so without this + policy the async stack reports attempts but never the operation outcome. + The tracer callbacks are synchronous, so the body matches the sync twin + apart from the ``await`` on the downstream send. + + Disable per-call by setting ``ctx.options["tracing_enabled"] = False``. + + Attributes: + STAGE: Pinned to `Stage.OPERATION` at the type level so mis-slotting is + caught by ``mypy``. + """ + + STAGE: ClassVar[Literal[Stage.OPERATION]] = Stage.OPERATION + __slots__ = () + + async def send(self, request: Request, ctx: PipelineContext) -> AsyncResponse: + """Bracket the downstream chain with the per-operation lifecycle. + + Args: + request: Outgoing request. + ctx: Pipeline context, forwarded unchanged. + + Returns: + The response from the downstream chain. + """ + if not ctx.options.get("tracing_enabled", True): + return await self.next.send(request, ctx) + http_tracer = resolve_http_tracer(ctx) + http_tracer.operation_started() + try: + response = await self.next.send(request, ctx) + except BaseException as err: + http_tracer.operation_failed(err) + raise + http_tracer.operation_succeeded() + return response + + +__all__ = ["AsyncOperationTracingPolicy"] diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py index b96f3da..cbcf3ea 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py @@ -30,9 +30,10 @@ the sentinel trace ids. Disable either per-call by setting ``ctx.options["tracing_enabled"] = False``. -Both policies are sync-only, in line with the rest of the tracing and logging -stack; ``default_async_pipeline`` carries no tracing, so async callers own -per-operation observability on their side. +`OperationTracingPolicy` has an async twin, `AsyncOperationTracingPolicy`, +wired into ``default_async_pipeline`` so the async stack reports the same +per-operation lifecycle. `TracingPolicy`'s per-attempt OpenTelemetry span +machinery is sync-only and has no async counterpart. """ from __future__ import annotations diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py b/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py index a2f508c..06d920f 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_defaults.py @@ -126,3 +126,14 @@ def test_default_async_pipeline_returns_builder() -> None: def test_default_async_pipeline_builds_async_pipeline() -> None: pipeline = default_async_pipeline(_AsyncStubTransport()).build() assert isinstance(pipeline, AsyncPipeline) + + +def test_default_async_pipeline_wires_operation_tracing() -> None: + from dexpace.sdk.core.pipeline.policies import AsyncOperationTracingPolicy + + builder = default_async_pipeline(_AsyncStubTransport()) + # Mirrors the sync default: the per-operation lifecycle policy occupies the + # outermost OPERATION pillar, so async callers get the same + # operation_started / operation_succeeded / operation_failed bracket around + # the attempt-level events the retry/redirect policies already emit. + assert isinstance(builder._pillars[Stage.OPERATION], AsyncOperationTracingPolicy) diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py index 814ebef..c549821 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py @@ -35,6 +35,8 @@ from dexpace.sdk.core.instrumentation.noop import NOOP_SPAN from dexpace.sdk.core.pipeline import AsyncPipeline, Pipeline from dexpace.sdk.core.pipeline.policies import ( + AsyncOperationTracingPolicy, + AsyncRetryPolicy, OperationTracingPolicy, RetryPolicy, TracingPolicy, @@ -517,6 +519,182 @@ def test_redirect_then_failure_reports_operation_failed(self) -> None: assert failed == [boom] +class _RaiseThenOkAsyncClient(AsyncHttpClient): + """Raises a retryable error on the first ``fail_count`` calls, then OK.""" + + def __init__(self, *, error: BaseException, fail_count: int) -> None: + self._error = error + self._fail_count = fail_count + self.calls = 0 + + async def execute(self, request: Request) -> AsyncResponse: + self.calls += 1 + if self.calls <= self._fail_count: + raise self._error + return AsyncResponse(request=request, protocol=Protocol.HTTP_1_1, status=Status.OK) + + +class _AlwaysRaisesAsyncClient(AsyncHttpClient): + """Raises a retryable error on every call to exhaust the retry budget.""" + + def __init__(self, error: BaseException) -> None: + self._error = error + self.calls = 0 + + async def execute(self, request: Request) -> AsyncResponse: + self.calls += 1 + raise self._error + + +class _RedirectThenRaiseAsyncClient(AsyncHttpClient): + """Returns a redirect, then raises on the reissued (post-redirect) request.""" + + def __init__(self, *, location: str, error: BaseException) -> None: + self._location = location + self._error = error + self.calls = 0 + + async def execute(self, request: Request) -> AsyncResponse: + self.calls += 1 + if self.calls == 1: + response = AsyncResponse( + request=request, + protocol=Protocol.HTTP_1_1, + status=Status.MOVED_PERMANENTLY, + ) + return response.with_header("Location", self._location) + raise self._error + + +class _AsyncFakeClock: + """Deterministic ``AsyncClock`` for tests; advances time on sleep.""" + + __slots__ = ("_t",) + + def __init__(self, start: float = 0.0) -> None: + self._t = start + + def now(self) -> float: + return self._t + + def monotonic(self) -> float: + return self._t + + async def sleep(self, duration: float) -> None: + self._t += max(0.0, duration) + + +class TestAsyncOperationEventsFireOncePerOperation: + """`AsyncOperationTracingPolicy` mirrors the sync per-operation guarantees. + + The async retry / redirect policies already emit attempt-level events + through the shared per-operation tracer; these tests pin that the operation + lifecycle now brackets them exactly once and reflects the final outcome. + """ + + async def test_async_retry_then_success_reports_operation_succeeded(self) -> None: + # A connect failure retried to success must report a single + # operation_succeeded, never operation_failed for the discarded attempt. + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + client = _RaiseThenOkAsyncClient(error=ServiceRequestError("connect failed"), fail_count=1) + retry = AsyncRetryPolicy(connect_retries=3, total_retries=3, clock=_AsyncFakeClock()) + async with AsyncPipeline( + client, + policies=[AsyncOperationTracingPolicy(), retry], + ) as p: + response = await p.run(_request(), DispatchContext(instr)) + assert int(response.status) == 200 + assert client.calls == 2 + names = tracer.names() + assert names.count("operation_started") == 1 + assert names.count("operation_succeeded") == 1 + assert names.count("operation_failed") == 0 + # client.calls == 2 confirms the first attempt was retried; the single + # lifecycle bracket still opens before anything the chain emits. + assert names[0] == "operation_started" + + async def test_async_retry_exhaustion_emits_operation_failed_once(self) -> None: + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + boom = ServiceRequestError("connect failed") + client = _AlwaysRaisesAsyncClient(boom) + retry = AsyncRetryPolicy(connect_retries=2, total_retries=2, clock=_AsyncFakeClock()) + raised: BaseException | None = None + async with AsyncPipeline( + client, + policies=[AsyncOperationTracingPolicy(), retry], + ) as p: + try: + await p.run(_request(), DispatchContext(instr)) + except ServiceRequestError as err: + raised = err + assert raised is boom + names = tracer.names() + assert names.count("operation_started") == 1 + assert names.count("operation_failed") == 1 + assert names.count("operation_succeeded") == 0 + failed = [payload for name, payload in tracer.events if name == "operation_failed"] + assert failed == [boom] + + async def test_async_redirect_emits_operation_lifecycle_once_across_hops(self) -> None: + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + client = _ScriptedAsyncClient( + [ + _Hop(Status.MOVED_PERMANENTLY, "https://api.example.com/new"), + _Hop(Status.OK), + ], + ) + async with AsyncPipeline( + client, + policies=[AsyncOperationTracingPolicy(), AsyncRedirectPolicy()], + ) as p: + await p.run(_request("https://api.example.com/start"), DispatchContext(instr)) + op_events = [name for name in tracer.names() if name.startswith("operation_")] + assert op_events == ["operation_started", "operation_succeeded"] + # Two hops still resolve two URLs inside the single operation bracket. + assert tracer.names().count("request_url_resolved") == 2 + + async def test_async_redirect_then_failure_reports_operation_failed(self) -> None: + # When a later redirect hop fails, the operation outcome is the failure + # that escapes — not the success of the earlier 3xx hop. + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + boom = ServiceRequestError("connect failed") + client = _RedirectThenRaiseAsyncClient(location="https://api.example.com/new", error=boom) + raised: BaseException | None = None + async with AsyncPipeline( + client, + policies=[AsyncOperationTracingPolicy(), AsyncRedirectPolicy()], + ) as p: + try: + await p.run(_request("https://api.example.com/start"), DispatchContext(instr)) + except ServiceRequestError as err: + raised = err + assert raised is boom + names = tracer.names() + assert names.count("operation_started") == 1 + assert names.count("operation_failed") == 1 + assert names.count("operation_succeeded") == 0 + + async def test_async_no_operation_events_when_tracing_disabled(self) -> None: + tracer = _RecordingHttpTracer() + instr = _instr(tracer) + client = _ScriptedAsyncClient([_Hop(Status.OK)]) + async with AsyncPipeline( + client, + policies=[AsyncOperationTracingPolicy(), AsyncRedirectPolicy()], + ) as p: + await p.run( + _request("https://api.example.com/start"), + DispatchContext(instr), + tracing_enabled=False, + ) + op_events = [name for name in tracer.names() if name.startswith("operation_")] + assert op_events == [] + + # ----- request_sent fires for unknown-length bodies ----------------- diff --git a/tools/surface_baseline.json b/tools/surface_baseline.json index c93c6ae..64f4e5a 100644 --- a/tools/surface_baseline.json +++ b/tools/surface_baseline.json @@ -1155,6 +1155,16 @@ } } }, + "dexpace.sdk.core.pipeline.policies.async_tracing_policy": { + "AsyncOperationTracingPolicy": { + "bases": [ + "AsyncPolicy" + ], + "methods": { + "send": "async send(self, request: Request, ctx: PipelineContext) -> AsyncResponse" + } + } + }, "dexpace.sdk.core.pipeline.policies.client_identity": { "ClientIdentityPolicy": { "bases": [ @@ -1601,6 +1611,7 @@ "dexpace.sdk.core.pipeline.policies": [ "AsyncClientIdentityPolicy", "AsyncIdempotencyPolicy", + "AsyncOperationTracingPolicy", "AsyncRedirectPolicy", "AsyncRetryPolicy", "AsyncSetDatePolicy", From 4fe3a5fad989b47f7e9355907133649de3f876ce Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 10 Jun 2026 21:19:44 +0300 Subject: [PATCH 6/6] feat(pipeline): warn when TracingPolicy runs without an OperationTracingPolicy Splitting the per-operation lifecycle into OperationTracingPolicy left one case silent: a hand-built pipeline that lists TracingPolicy on its own keeps emitting per-attempt spans but no longer emits operation_started / operation_succeeded / operation_failed. With a real HttpTracer installed, that is a misconfiguration the operator should be told about rather than discover through missing telemetry. TracingPolicy now logs a one-time warning when it runs with a non-noop HttpTracer and no OperationTracingPolicy bracketed the operation (detected via a ctx.data marker the operation policy sets). A no-op tracer has no consumer, so it stays silent; default_pipeline wires both policies, so the common path never warns. The once-guard takes a lock so it stays correct under free-threaded CPython. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 4 +- .../core/pipeline/policies/tracing_policy.py | 51 ++++++++++++++++++- .../tests/pipeline/test_tracer_emission.py | 47 +++++++++++++++++ 3 files changed, 100 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ab8b61..e30c946 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,7 +80,9 @@ Changed). unaffected. A pipeline assembled by hand that wants the operation lifecycle must now add `OperationTracingPolicy` alongside `TracingPolicy` — a bare `TracingPolicy` no longer emits `operation_started` / `operation_succeeded` / - `operation_failed`. + `operation_failed`. So that change is not silent, a `TracingPolicy` that runs + with a real `HttpTracer` but no `OperationTracingPolicy` bracketing it logs a + one-time warning. - **Default pipelines** (`pipeline.defaults`). The standard sync/async stacks now assemble the new idempotency and client-identity policies alongside the existing retry, redirect, logging, and tracing policies. diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py index cbcf3ea..e34523f 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/policies/tracing_policy.py @@ -38,9 +38,11 @@ from __future__ import annotations +import logging +import threading from typing import TYPE_CHECKING -from ...instrumentation import NOOP_TRACER, Tracer, bind_correlation +from ...instrumentation import NOOP_HTTP_TRACER, NOOP_TRACER, Tracer, bind_correlation from ..policy import Policy from ..stage import Stage from .redirect import resolve_http_tracer @@ -53,6 +55,43 @@ from ..context import PipelineContext +_LOGGER = logging.getLogger(__name__) + +#: ``ctx.data`` marker set by `OperationTracingPolicy` to record that the +#: per-operation lifecycle is being bracketed. `TracingPolicy` reads it to +#: detect a pipeline that wires the per-attempt policy without the operation +#: one and warns (once) so the absent lifecycle is not silent. +_OPERATION_BRACKET_KEY: str = "tracing_operation_bracketed" + +_BRACKET_WARNING_LOCK = threading.Lock() +_bracket_warning_emitted = False + + +def _warn_missing_operation_bracket_once() -> None: + """Log, at most once per process, that the operation lifecycle is absent. + + `TracingPolicy` calls this when it runs with a real ``HttpTracer`` but no + `OperationTracingPolicy` bracketed the operation, so the per-operation + lifecycle never fires. The misconfiguration is static, so the warning fires + once; the lock keeps the guard correct under free-threaded CPython rather + than relying on the GIL. + """ + global _bracket_warning_emitted + if _bracket_warning_emitted: + return + with _BRACKET_WARNING_LOCK: + if _bracket_warning_emitted: + return + _bracket_warning_emitted = True + _LOGGER.warning( + "TracingPolicy is running without an OperationTracingPolicy: per-attempt " + "spans and per-request events are emitted, but the per-operation HttpTracer " + "lifecycle (operation_started / operation_succeeded / operation_failed) is " + "not. Add OperationTracingPolicy at Stage.OPERATION; default_pipeline wires " + "both." + ) + + class OperationTracingPolicy(Policy): """Emit the per-operation ``HttpTracer`` lifecycle around the whole call. @@ -75,6 +114,7 @@ class OperationTracingPolicy(Policy): def send(self, request: Request, ctx: PipelineContext) -> Response: if not ctx.options.get("tracing_enabled", True): return self.next.send(request, ctx) + ctx.data[_OPERATION_BRACKET_KEY] = True http_tracer = resolve_http_tracer(ctx) http_tracer.operation_started() try: @@ -112,6 +152,10 @@ class TracingPolicy(Policy): is emitted by `OperationTracingPolicy`, which brackets the whole call from outside the retry / redirect wrappers. + If a real ``HttpTracer`` is installed but no `OperationTracingPolicy` + brackets the operation, this policy logs a one-time warning so the absent + per-operation lifecycle is not silent. + Disable per-call by setting ``ctx.options["tracing_enabled"] = False``. """ @@ -128,6 +172,11 @@ def send(self, request: Request, ctx: PipelineContext) -> Response: # Share one per-operation tracer with the operation / redirect / retry # policies via ``ctx.data`` (whichever policy runs first mints it). http_tracer = resolve_http_tracer(ctx) + # Warn once if a real tracer is installed but no OperationTracingPolicy + # brackets the operation, so the absent per-operation lifecycle is not + # silent. A no-op tracer has no consumer, so there is nothing to warn. + if http_tracer is not NOOP_HTTP_TRACER and _OPERATION_BRACKET_KEY not in ctx.data: + _warn_missing_operation_bracket_once() span = self._tracer.start_span(f"HTTP {request.method}", parent=parent) _set_request_attributes(span, request) with bind_correlation(trace_id=_trace_id(span), span_id=_span_id(span)): diff --git a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py index c549821..5de81fa 100644 --- a/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py +++ b/packages/dexpace-sdk-core/tests/pipeline/test_tracer_emission.py @@ -12,8 +12,12 @@ from __future__ import annotations +import logging from collections.abc import Mapping, Sequence +import pytest +from _pytest.logging import LogCaptureFixture + from dexpace.sdk.core.client.async_http_client import AsyncHttpClient from dexpace.sdk.core.client.http_client import HttpClient from dexpace.sdk.core.errors import ServiceRequestError @@ -40,6 +44,7 @@ OperationTracingPolicy, RetryPolicy, TracingPolicy, + tracing_policy, ) from dexpace.sdk.core.pipeline.policies.async_redirect import AsyncRedirectPolicy from dexpace.sdk.core.pipeline.policies.redirect import RedirectPolicy @@ -695,6 +700,48 @@ async def test_async_no_operation_events_when_tracing_disabled(self) -> None: assert op_events == [] +class TestTracingPolicyMisconfigurationWarning: + """`TracingPolicy` warns once when a real tracer has no operation bracket. + + The per-operation lifecycle moved to `OperationTracingPolicy`; a real tracer + behind a bare `TracingPolicy` would otherwise silently lose + operation_started / operation_succeeded / operation_failed. + """ + + def test_warns_once_when_real_tracer_has_no_operation_bracket( + self, caplog: LogCaptureFixture, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.setattr(tracing_policy, "_bracket_warning_emitted", False) + instr = _instr(_RecordingHttpTracer()) + caplog.set_level(logging.WARNING) + with Pipeline(_OkClient(), policies=[TracingPolicy()]) as p: + p.run(_request(), DispatchContext(instr)) + p.run(_request(), DispatchContext(instr)) + warns = [r for r in caplog.records if "OperationTracingPolicy" in r.getMessage()] + assert len(warns) == 1 + assert "operation_started" in warns[0].getMessage() + + def test_no_warning_when_operation_bracket_present( + self, caplog: LogCaptureFixture, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.setattr(tracing_policy, "_bracket_warning_emitted", False) + instr = _instr(_RecordingHttpTracer()) + caplog.set_level(logging.WARNING) + with Pipeline(_OkClient(), policies=[OperationTracingPolicy(), TracingPolicy()]) as p: + p.run(_request(), DispatchContext(instr)) + assert not [r for r in caplog.records if "OperationTracingPolicy" in r.getMessage()] + + def test_no_warning_for_noop_tracer( + self, caplog: LogCaptureFixture, monkeypatch: pytest.MonkeyPatch + ) -> None: + # No real tracer is installed, so the missing bracket is irrelevant. + monkeypatch.setattr(tracing_policy, "_bracket_warning_emitted", False) + caplog.set_level(logging.WARNING) + with Pipeline(_OkClient(), policies=[TracingPolicy()]) as p: + p.run(_request(), DispatchContext.noop()) + assert not [r for r in caplog.records if "OperationTracingPolicy" in r.getMessage()] + + # ----- request_sent fires for unknown-length bodies -----------------