From 081fbfe9674a2deab82e251badeed64c1e078c75 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Tue, 16 Jun 2026 23:32:03 +0000 Subject: [PATCH 1/2] fix(bedrock): preserve stream event type and drop invocation-metrics trailer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Bedrock stream decoder previously wrapped every chunk as ServerSentEvent(event="completion"), discarding the payload's actual event type. Chunks without a "type" field — notably the amazon-bedrock-invocationMetrics trailer that Bedrock appends to every stream — then routed through the completion branch in _streaming.py, which does not backfill data["type"]. construct_type fell back to the first member of the RawMessageStreamEvent union and yielded RawMessageStartEvent(message=None), violating the type contract and crashing any consumer that read event.message. The decoder now reads "type" from the chunk payload and uses it as the SSE event name, drops the invocation-metrics trailer (it is not a model stream event), and only falls back to "completion" for legacy untyped payloads. Fixes #1647 --- src/anthropic/lib/bedrock/_stream_decoder.py | 43 ++++++++++++++++---- tests/lib/test_bedrock.py | 38 +++++++++++++++++ 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/anthropic/lib/bedrock/_stream_decoder.py b/src/anthropic/lib/bedrock/_stream_decoder.py index 02e81a3ca..20fd48fc3 100644 --- a/src/anthropic/lib/bedrock/_stream_decoder.py +++ b/src/anthropic/lib/bedrock/_stream_decoder.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Iterator, AsyncIterator +import json +from typing import TYPE_CHECKING, Any, Dict, Iterator, AsyncIterator, cast from ..._utils import lru_cache from ..._streaming import ServerSentEvent @@ -35,9 +36,9 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: for chunk in iterator: event_stream_buffer.add_data(chunk) for event in event_stream_buffer: - message = self._parse_message_from_event(event) - if message: - yield ServerSentEvent(data=message, event="completion") + sse = self._parse_message_from_event(event) + if sse: + yield sse async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]: """Given an async iterator that yields lines, iterate over it & yield every event encountered""" @@ -47,11 +48,11 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser async for chunk in iterator: event_stream_buffer.add_data(chunk) for event in event_stream_buffer: - message = self._parse_message_from_event(event) - if message: - yield ServerSentEvent(data=message, event="completion") + sse = self._parse_message_from_event(event) + if sse: + yield sse - def _parse_message_from_event(self, event: EventStreamMessage) -> str | None: + def _parse_message_from_event(self, event: EventStreamMessage) -> ServerSentEvent | None: response_dict = event.to_response_dict() parsed_response = self.parser.parse(response_dict, get_response_stream_shape()) if response_dict["status_code"] != 200: @@ -61,4 +62,28 @@ def _parse_message_from_event(self, event: EventStreamMessage) -> str | None: if not chunk: return None - return chunk.get("bytes").decode() # type: ignore[no-any-return] + return _chunk_bytes_to_sse(chunk.get("bytes")) + + +def _chunk_bytes_to_sse(raw: bytes) -> ServerSentEvent | None: + decoded = raw.decode() + data: Any + try: + data = json.loads(decoded) + except Exception: + data = None + + if not isinstance(data, dict): + return ServerSentEvent(data=decoded, event="completion") + + payload = cast("Dict[str, Any]", data) + event_type = payload.get("type") + if not isinstance(event_type, str): + # Bedrock appends a trailing chunk that only carries invocation + # metrics; it is not a model stream event, so drop it rather than + # let it be mis-constructed against the stream-event union. + if "amazon-bedrock-invocationMetrics" in payload and "completion" not in payload: + return None + event_type = "completion" + + return ServerSentEvent(data=decoded, event=event_type) diff --git a/tests/lib/test_bedrock.py b/tests/lib/test_bedrock.py index 6e45c27f7..04ea1db47 100644 --- a/tests/lib/test_bedrock.py +++ b/tests/lib/test_bedrock.py @@ -9,6 +9,7 @@ from respx import MockRouter from anthropic import AnthropicBedrock, AsyncAnthropicBedrock +from anthropic.lib.bedrock._stream_decoder import _chunk_bytes_to_sse sync_client = AnthropicBedrock( aws_region="us-east-1", @@ -275,3 +276,40 @@ def test_region_infer_from_specified_profile( client = AnthropicBedrock() assert client.aws_region == next(profile for profile in profiles if profile["name"] == aws_profile)["region"] + + +def test_chunk_bytes_to_sse_typed_event() -> None: + raw = ( + b'{"type":"message_start","message":{"id":"msg_123","type":"message","role":"assistant",' + b'"content":[],"model":"claude-x","stop_reason":null,"stop_sequence":null,' + b'"usage":{"input_tokens":1,"output_tokens":1}}}' + ) + sse = _chunk_bytes_to_sse(raw) + assert sse is not None + assert sse.event == "message_start" + assert sse.data == raw.decode() + + +def test_chunk_bytes_to_sse_drops_invocation_metrics() -> None: + raw = ( + b'{"amazon-bedrock-invocationMetrics":{"inputTokenCount":1,"outputTokenCount":1,' + b'"invocationLatency":100,"firstByteLatency":50}}' + ) + assert _chunk_bytes_to_sse(raw) is None + + +def test_chunk_bytes_to_sse_legacy_completion() -> None: + raw = b'{"completion":" Hello","stop_reason":null,"model":"claude-2"}' + sse = _chunk_bytes_to_sse(raw) + assert sse is not None + assert sse.event == "completion" + + +def test_chunk_bytes_to_sse_legacy_completion_with_metrics() -> None: + raw = ( + b'{"completion":" Hello","stop_reason":"stop_sequence","model":"claude-2",' + b'"amazon-bedrock-invocationMetrics":{"inputTokenCount":1,"outputTokenCount":1}}' + ) + sse = _chunk_bytes_to_sse(raw) + assert sse is not None + assert sse.event == "completion" From 2f4c51862786b4feca2426bc3118c4df9d52596c Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Tue, 16 Jun 2026 17:13:39 -0700 Subject: [PATCH 2/2] remove amazon-bedrock-invocationMetrics stripping --- src/anthropic/lib/bedrock/_stream_decoder.py | 5 ----- tests/lib/test_bedrock.py | 8 -------- 2 files changed, 13 deletions(-) diff --git a/src/anthropic/lib/bedrock/_stream_decoder.py b/src/anthropic/lib/bedrock/_stream_decoder.py index 20fd48fc3..66dd658cc 100644 --- a/src/anthropic/lib/bedrock/_stream_decoder.py +++ b/src/anthropic/lib/bedrock/_stream_decoder.py @@ -79,11 +79,6 @@ def _chunk_bytes_to_sse(raw: bytes) -> ServerSentEvent | None: payload = cast("Dict[str, Any]", data) event_type = payload.get("type") if not isinstance(event_type, str): - # Bedrock appends a trailing chunk that only carries invocation - # metrics; it is not a model stream event, so drop it rather than - # let it be mis-constructed against the stream-event union. - if "amazon-bedrock-invocationMetrics" in payload and "completion" not in payload: - return None event_type = "completion" return ServerSentEvent(data=decoded, event=event_type) diff --git a/tests/lib/test_bedrock.py b/tests/lib/test_bedrock.py index 04ea1db47..f8aefef3a 100644 --- a/tests/lib/test_bedrock.py +++ b/tests/lib/test_bedrock.py @@ -290,14 +290,6 @@ def test_chunk_bytes_to_sse_typed_event() -> None: assert sse.data == raw.decode() -def test_chunk_bytes_to_sse_drops_invocation_metrics() -> None: - raw = ( - b'{"amazon-bedrock-invocationMetrics":{"inputTokenCount":1,"outputTokenCount":1,' - b'"invocationLatency":100,"firstByteLatency":50}}' - ) - assert _chunk_bytes_to_sse(raw) is None - - def test_chunk_bytes_to_sse_legacy_completion() -> None: raw = b'{"completion":" Hello","stop_reason":null,"model":"claude-2"}' sse = _chunk_bytes_to_sse(raw)