diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 43836fe34..8cdcefc2d 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -248,6 +248,13 @@ def handle_content_block_delta( state["redactedContent"] = state.get("redactedContent", b"") + redacted_content typed_event = ReasoningRedactedContentStreamEvent(redacted_content=redacted_content, delta=delta_content) + elif "image" in delta_content: + if "image" not in state: + state["image"] = [] + + state["image"].append(delta_content["image"]) + typed_event = ModelStreamEvent(delta_content) + return state, typed_event @@ -313,6 +320,11 @@ def handle_content_block_stop(state: dict[str, Any]) -> dict[str, Any]: elif redacted_content: content.append({"reasoningContent": {"redactedContent": redacted_content}}) state["redactedContent"] = b"" + elif state.get("image"): + # Add all accumulated image blocks to content + for image_data in state["image"]: + content.append({"image": image_data}) + state["image"] = [] return state @@ -382,6 +394,7 @@ async def process_stream( "current_tool_use": {}, "reasoningText": "", "citationsContent": [], + "image": [], } state["content"] = state["message"]["content"] diff --git a/src/strands/models/bedrock.py b/src/strands/models/bedrock.py index 4a7c81672..7cbd32949 100644 --- a/src/strands/models/bedrock.py +++ b/src/strands/models/bedrock.py @@ -845,6 +845,13 @@ def _convert_non_streaming_to_streaming(self, response: dict[str, Any]) -> Itera "sourceContent": citation["sourceContent"], } yield {"contentBlockDelta": {"delta": {"citation": citation_metadata}}} + elif "image" in content: + # Yield image content as a delta + yield { + "contentBlockDelta": { + "delta": {"image": content["image"]}, + } + } # Yield contentBlockStop event yield {"contentBlockStop": {}} diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 02be400b1..03dde6a11 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -1070,3 +1070,42 @@ async def test_stream_messages_normalizes_messages(agenerator, alist): {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, ] + + +@pytest.mark.asyncio +async def test_process_stream_with_image_content(): + """Test that image content blocks are properly processed in streaming responses.""" + test_image_data = { + "format": "png", + "source": { + "bytes": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==" + }, + } + + chunks = [ + cast(MessageStartEvent, {"messageStart": {"role": "assistant"}}), + cast(ContentBlockStartEvent, {"contentBlockStart": {"start": {}}}), + cast(ContentBlockDeltaEvent, {"contentBlockDelta": {"delta": {"image": test_image_data}}}), + {"contentBlockStop": {}}, + cast(MessageStopEvent, {"messageStop": {"stopReason": "end_turn"}}), + {"metadata": {"usage": {"inputTokens": 10, "outputTokens": 5, "totalTokens": 15}}}, + ] + + async def async_chunks(): + for chunk in chunks: + yield chunk + + events = [] + async for event in strands.event_loop.streaming.process_stream(async_chunks()): + events.append(event) + + # Verify we got a stop event with the image in the message + stop_event = cast(ModelStopReason, events[-1]) + assert "stop" in stop_event + + stop_reason, message, usage, metrics = stop_event["stop"] + assert stop_reason == "end_turn" + assert message["role"] == "assistant" + assert len(message["content"]) == 1 + assert "image" in message["content"][0] + assert message["content"][0]["image"] == test_image_data