Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

A round of platform improvements to `dexpace-sdk-core`: new optional building
blocks (typed serialization, webhook verification, pagination, two pipeline
policies), tightened retry and tracing behaviour, and a batch of correctness
fixes across bodies, SSE parsing, Digest auth, and error reporting. Most of this
lands in `core`; the transport adapters additionally get consistent connect- vs
read-phase timeout classification and tighter resource release. The only removed
public symbol is the unused `RetryConfig` (see Removed); existing code otherwise
continues to work without modification.
blocks (typed serialization, webhook verification, pagination, three pipeline
policies), tightened retry behaviour, a corrected per-operation tracing
lifecycle, and a batch of correctness fixes across bodies, SSE parsing, Digest
auth, and error reporting. Most of this lands in `core`; the transport adapters
additionally get consistent connect- vs read-phase timeout classification,
tighter resource release, and a set of edge-case corrections (status-code
reporting, chunked-framing detection, and content-length under content-encoding).
The only removed public symbol is the unused `RetryConfig` (see Removed);
existing code otherwise continues to work without modification — with one
behavioural note for hand-assembled pipelines (see *Tracing lifecycle* under
Changed).

### Added

Expand All @@ -37,6 +41,16 @@ continues to work without modification.
- **Client-identity policy** (`pipeline.policies.client_identity`, plus its
async twin). Sets a consistent `User-Agent` / client-identity header derived
from the configured application id and SDK version.
- **Per-operation tracing policy** (`OperationTracingPolicy` and its async twin
`AsyncOperationTracingPolicy`, with a new outermost `Stage.OPERATION`). Emits
the per-operation `HttpTracer` lifecycle (`operation_started`, then exactly
one `operation_succeeded` / `operation_failed`) from outside the retry and
redirect wrappers, so the reported outcome reflects the final result of the
whole call rather than a single attempt or hop. Both `default_pipeline` and
`default_async_pipeline` wire it, so the async stack now reports the same
lifecycle alongside the attempt-level events its retry / redirect policies
already emit. Only `TracingPolicy`'s per-attempt OpenTelemetry span policy
remains sync-only.
- **HTTP tracer** (`instrumentation.http_tracer`). An adapter-style tracer base
whose per-event methods default to no-ops, so a subclass overrides only the
events it cares about. Wired through the tracing policy for span emission.
Expand All @@ -58,6 +72,17 @@ continues to work without modification.
cancellation cleanly between attempts.
- **Tracing and redirect policies** now emit tracer events and carry correlation
through redirects, with credentials stripped on cross-origin redirects.
- **Tracing lifecycle** (`pipeline.policies.tracing_policy`). The per-operation
`HttpTracer` lifecycle moved out of `TracingPolicy` into the new
`OperationTracingPolicy`; `TracingPolicy` now emits only its per-attempt span
and the per-request events (`request_sent`, `response_headers_received`,
`response_received`). `default_pipeline` wires both, so callers who use it are
unaffected. A pipeline assembled by hand that wants the operation lifecycle
must now add `OperationTracingPolicy` alongside `TracingPolicy` — a bare
`TracingPolicy` no longer emits `operation_started` / `operation_succeeded` /
`operation_failed`. So that change is not silent, a `TracingPolicy` that runs
with a real `HttpTracer` but no `OperationTracingPolicy` bracketing it logs a
one-time warning.
- **Default pipelines** (`pipeline.defaults`). The standard sync/async stacks now
assemble the new idempotency and client-identity policies alongside the
existing retry, redirect, logging, and tracing policies.
Expand Down Expand Up @@ -96,6 +121,26 @@ continues to work without modification.
`async_response_body`). Cancelling an in-flight read now releases the
underlying resources instead of leaking them, and re-raises `CancelledError`
after cleanup.
- **Per-operation tracing outcome** (`pipeline.policies.tracing_policy`). A call
retried after a failed first attempt no longer reports `operation_failed` for
the discarded attempt (it reports the single `operation_succeeded` it ends on),
and a redirect whose later hop fails no longer reports `operation_succeeded`
for the earlier 3xx hop. The lifecycle now fires exactly once and reflects the
final outcome. See *Tracing lifecycle* under Changed for the API shape.
- **`Content-Length` under `Content-Encoding`** (`http.stdlib.urllib_http_client`).
`UrllibHttpClient` no longer drops a valid `Content-Length` when
`Content-Encoding` is present: `http.client` does not decode content codings,
so the body it serves is the wire payload whose length the header describes,
and the length is now surfaced as-is. (The decompressing requests/httpx/aiohttp
adapters still drop it, since they hand back a decoded stream.)
- **Chunked-framing detection** (`http.stdlib.asyncio_http_client`). The
`Transfer-Encoding` check matches the `chunked` coding by token rather than
substring, so a coding whose name merely contains `chunked` (e.g. `x-chunked`)
is no longer mistaken for chunked framing.
- **Out-of-range status reporting** (`http.stdlib.urllib_http_client`,
`asyncio_http_client`). Both now raise a `ServiceResponseError` worded
`Invalid status code: …` for a status outside 100–599, matching the other
adapters.

### Verified

Expand All @@ -113,6 +158,11 @@ The following were intentionally left out of this round and are **not** included
errors themselves.
- **`sendfile` fast-path** — file bodies are streamed via the existing
`iter_bytes` path; no zero-copy `sendfile` transport optimisation was added.
- **Async OpenTelemetry spans / logging** — the per-attempt span policy
(`TracingPolicy`) and `LoggingPolicy` ship sync-only, so
`default_async_pipeline` emits the per-operation `HttpTracer` lifecycle and
attempt-level events but no OpenTelemetry spans or structured request /
response logs.
- **MCP support** — no Model Context Protocol integration is included.
- **Java SDK items** — the Java counterpart lives in a separate repository and
was out of scope here.
Expand Down
20 changes: 11 additions & 9 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ python-sdk/
│ │ │ │
│ │ │ ├── policies/ # redirect, idempotency, retry, set_date,
│ │ │ │ # client_identity, logging, tracing
│ │ │ │ # (async twins only for the first five)
│ │ │ │ # (async twins for all but logging and per-attempt tracing)
│ │ │ └── step/ # PipelineStep, StepMetadata
│ │ ├── client/ # HttpClient + AsyncHttpClient Protocols
│ │ ├── config/ # Configuration
Expand Down Expand Up @@ -206,15 +206,17 @@ Layered, bottom-up:
4. **`pipeline`** — `Policy` (and `AsyncPolicy`) wrap the downstream chain;
`Pipeline` / `AsyncPipeline` run an ordered set of policies grouped into
`Stage`s via `StagedPipelineBuilder`. Shipped policies: redirect,
idempotency, retry, set-date, client-identity, logging, tracing. Async
twins under `pipeline/policies/` exist only for redirect, idempotency,
retry, set-date, and client-identity; logging and tracing are sync-only.
idempotency, retry, set-date, client-identity, logging, tracing, and
operation-tracing. Async twins exist for redirect, idempotency, retry,
set-date, client-identity, and operation-tracing; logging and the
per-attempt tracing policy are sync-only.
`default_pipeline()` / `default_async_pipeline()` assemble the standard
stack in the order redirect → idempotency → retry → set-date →
client-identity → [auth] → logging → tracing (the async pipeline omits
logging and tracing). The lower-level `pipeline/step/PipelineStep` Protocol
(`(input, context) -> output`) plus `StepMetadata` remain for custom
composition.
stack in the order operation-tracing → redirect → idempotency → retry →
set-date → client-identity → [auth] → logging → tracing (the async
pipeline keeps operation-tracing but omits logging and the per-attempt
tracing span). The lower-level
`pipeline/step/PipelineStep` Protocol (`(input, context) -> output`) plus
`StepMetadata` remain for custom composition.
5. **`client/HttpClient`** — single-method Protocol
(`execute(request) -> Response`). Transport is **not** provided by `core`;
the `dexpace-sdk-http-*` packages (stdlib, httpx, aiohttp, requests) each
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ the way back up. The terminal policy hands the request to an `HttpClient`
transport.

```
caller → Pipeline → REDIRECT → POST_REDIRECT → RETRY → POST_RETRY → [AUTH] → LOGGING → POST_LOGGING → HttpClient → wire
(pillar) idempotency (pillar) set-date (pillar) (pillar) tracing
client-identity
caller → Pipeline → OPERATION → REDIRECT → POST_REDIRECT → RETRY → POST_RETRY → [AUTH] → LOGGING → POST_LOGGING → HttpClient → wire
(pillar) (pillar) idempotency (pillar) set-date (pillar) (pillar) tracing
client-identity
```

Ordering is governed by `Stage`, an `IntEnum` whose values sit 100 apart so
Ordering is governed by `Stage`, an `IntEnum` whose values are spaced out so
new stages can land without renumbering. Pillar stages admit a single
policy; non-pillar stages stack with deque semantics. Callers who prefer
explicit ordering can still use the list form,
Expand Down Expand Up @@ -176,7 +176,7 @@ Bottom-up, the layers are:
| `http.webhooks` | `WebhookVerifier`, `InvalidWebhookSignatureError` — HMAC signature verification with timestamp tolerance |
| `pagination` | `Page`, `Paginator` / `AsyncPaginator`, `PaginationStrategy` (`CursorStrategy`, `PageNumberStrategy`, `LinkHeaderStrategy`) |
| `pipeline` | `Pipeline`, `AsyncPipeline`, `Policy` ABC, `Stage` enum, `StagedPipelineBuilder`, `default_pipeline()` |
| `pipeline.policies` | `RedirectPolicy`, `IdempotencyPolicy`, `RetryPolicy`, `SetDatePolicy`, `ClientIdentityPolicy`, `LoggingPolicy`, `TracingPolicy` (async twins for all but logging/tracing) |
| `pipeline.policies` | `RedirectPolicy`, `IdempotencyPolicy`, `RetryPolicy`, `SetDatePolicy`, `ClientIdentityPolicy`, `LoggingPolicy`, `OperationTracingPolicy`, `TracingPolicy` (async twins for all but `LoggingPolicy` and `TracingPolicy`) |
| `client` | `HttpClient` and `AsyncHttpClient` Protocols |
| `serde` | `Serde`, `Serializer`, `Deserializer` Protocols + `JsonSerde` reference impl |
| `instrumentation` | `ClientLogger`, `UrlRedactor`, `Tracer`, `Span`, `InstrumentationContext`, `contextvars` correlation helpers, noop singletons |
Expand Down Expand Up @@ -205,7 +205,8 @@ Bottom-up, the layers are:
reissue, userinfo dropped from `Location` URLs, configurable allowed
methods and 303 handling.
- **Observability.** Structured logging via `LoggingPolicy`,
OpenTelemetry-compatible spans via `TracingPolicy`, URL redaction with
per-attempt OpenTelemetry spans via `TracingPolicy` with a once-per-call
tracer lifecycle via `OperationTracingPolicy`, URL redaction with
allowlisted query parameters, and capped body capture for diagnostics.
- **Server-Sent Events.** A WHATWG-compliant `SseParser` with a bounded
line buffer, plus reconnecting `SseConnection` / `AsyncSseConnection`
Expand Down
50 changes: 33 additions & 17 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,44 @@ two parallel variants:
```python
from dexpace.sdk.http.stdlib import UrllibHttpClient
from dexpace.sdk.core.pipeline import Pipeline
from dexpace.sdk.core.pipeline.policies import LoggingPolicy, RetryPolicy, TracingPolicy
from dexpace.sdk.core.pipeline.policies import (
LoggingPolicy,
OperationTracingPolicy,
RetryPolicy,
TracingPolicy,
)

with Pipeline(
UrllibHttpClient(),
policies=[TracingPolicy(), LoggingPolicy(), RetryPolicy()],
policies=[OperationTracingPolicy(), TracingPolicy(), LoggingPolicy(), RetryPolicy()],
) as pipeline:
response = pipeline.run(request, dispatch_ctx)
```

The chain runs in declaration order. For the example above:

1. `TracingPolicy` opens a span.
2. `LoggingPolicy` logs the request.
3. `RetryPolicy` invokes the transport; retries on transient failures.
4. The transport runner calls `UrllibHttpClient.execute(request)`.
5. The unwinding mirror order: logging emits the response, tracing
closes the span.
1. `OperationTracingPolicy` emits `operation_started` for the whole call.
2. `TracingPolicy` opens a span.
3. `LoggingPolicy` logs the request.
4. `RetryPolicy` invokes the transport; retries on transient failures.
5. The transport runner calls `UrllibHttpClient.execute(request)`.
6. The unwinding mirrors that order: logging emits the response, tracing
closes the span, and operation-tracing emits the single
`operation_succeeded` / `operation_failed` once the call settles.

In the canonical `default_pipeline`, `OperationTracingPolicy` sits *outside*
the redirect and retry wrappers (so the per-operation lifecycle fires once and
reflects the final outcome), while `TracingPolicy` sits *inside* them and opens
one span per attempt / hop.

## Built-in policies

| Policy | Purpose |
|-------------------------------------|----------------------------------------------------------|
| `RetryPolicy` / `AsyncRetryPolicy` | Retry transient failures with backoff + `Retry-After`. Auto-buffers single-use request bodies when `total_retries > 0`. |
| `LoggingPolicy` | Structured request/response logs with URL redaction. |
| `TracingPolicy` | Open a span per request; OTel semantic-conv attributes. |
| `OperationTracingPolicy` | Emit the per-operation lifecycle (`operation_started`, then one `operation_succeeded`/`operation_failed`) from outside the retry/redirect loop. |
| `TracingPolicy` | Open a span per attempt and emit per-request tracer events; OTel semantic-conv attributes. |
| `BearerTokenPolicy` (auth) | Acquire + cache + apply OAuth bearer tokens. |
| `KeyCredentialPolicy` (auth) | Stamp an API key into a configurable header. |
| `BasicAuthPolicy` (auth) | `Authorization: Basic <base64>`. |
Expand All @@ -88,16 +101,19 @@ Per-call opt-outs follow a convention:
second `iter_bytes()` call raises `RuntimeError`. To keep retries safe
without forcing every caller to remember `to_replayable()`, both
`RetryPolicy.send` and `AsyncRetryPolicy.send` inspect the body up front
and, when `total_retries > 0`, swap in a buffered replayable copy before
the first attempt:
and, when the *effective* retry total for the call is positive, swap in a
buffered replayable copy before the first attempt:

```python
if total_retries > 0 and request.body is not None and not request.body.is_replayable():
settings = self._configure_settings(ctx.options)
if settings["total"] > 0 and request.body is not None and not request.body.is_replayable():
request = request.with_body(request.body.to_replayable())
```

`total_retries == 0` (e.g. `RetryPolicy.no_retries()`) skips the buffering
step so callers who explicitly opt out of retries pay no memory cost for a
copy they will never use. Already-replayable bodies (`from_bytes`,
`from_string`, `from_form`) flow through untouched because
`to_replayable()` returns `self`.
The decision reads the effective total *after* merging any per-call
`retry_total` override (from `ctx.options`) with the constructor default, so a
per-call `retry_total=3` over an instance built with `total_retries=0` still
buffers, and a per-call `retry_total=0` (or `RetryPolicy.no_retries()`) skips
the buffering so callers who opt out of retries pay no memory cost for a copy
they will never use. Already-replayable bodies (`from_bytes`, `from_string`,
`from_form`) flow through untouched because `to_replayable()` returns `self`.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ...pipeline.policy import Policy
from ...pipeline.stage import Stage
from ...util.clock import ASYNC_SYSTEM_CLOCK, SYSTEM_CLOCK, AsyncClock, Clock
from ..common.url import _origin
from .access_token import AccessTokenInfo, TokenRequestOptions
from .challenge import parse_challenges
from .challenge_handler import ChallengeHandler
Expand Down Expand Up @@ -439,29 +440,9 @@ async def _authorize(
return request.with_header("Authorization", f"{token.token_type} {token.token}")


_DEFAULT_PORTS: dict[str, int] = {"https": 443, "http": 80}
_AUTH_ORIGIN_KEY: str = "_auth_origin"


def _origin(url: Url) -> tuple[str, str, int | None]:
"""Return the ``(scheme, host, port)`` origin tuple for ``url``.

The scheme and host are lower-cased and the port is resolved to its
scheme default (443 for https, 80 for http) when not explicit, so two
URLs that differ only in an implied/explicit default port compare equal.

Args:
url: The URL to derive an origin from.

Returns:
A ``(scheme, host, effective_port)`` tuple suitable for equality
comparison.
"""
scheme = url.scheme.lower()
port = url.port if url.port is not None else _DEFAULT_PORTS.get(scheme)
return scheme, url.host.lower(), port


def _crosses_recorded_origin(request: Request, ctx: PipelineContext) -> bool:
"""Report whether ``request`` left the origin recorded for this operation.

Expand Down
24 changes: 24 additions & 0 deletions packages/dexpace-sdk-core/src/dexpace/sdk/core/http/common/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,28 @@ def parse(cls, raw: str) -> Self:
)


_DEFAULT_PORTS: dict[str, int] = {"https": 443, "http": 80}


def _origin(url: Url) -> tuple[str, str, int | None]:
"""Return the ``(scheme, host, effective_port)`` origin tuple for ``url``.

The scheme and host are lower-cased and the port is resolved to its scheme
default (443 for https, 80 for http) when not explicit, so two URLs that
differ only in an implied/explicit default port compare equal. Shared by
the redirect and auth policies for their cross-origin checks so the origin
definition stays in one place.

Args:
url: The URL to derive an origin from.

Returns:
A ``(scheme, host, effective_port)`` tuple suitable for equality
comparison.
"""
scheme = url.scheme.lower()
port = url.port if url.port is not None else _DEFAULT_PORTS.get(scheme)
return scheme, url.host.lower(), port


__all__ = ["QueryParams", "Url"]
Loading