Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions python/packages/a2a/agent_framework_a2a/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ async def _map_a2a_stream(
contents=contents,
role="assistant" if item.role == A2ARole.agent else "user",
response_id=str(getattr(item, "message_id", uuid.uuid4())),
additional_properties={"a2a_metadata": item.metadata} if item.metadata else None,
raw_representation=item,
)
all_updates.append(update)
Expand Down Expand Up @@ -452,13 +453,24 @@ def _updates_from_task(
role=message.role,
response_id=task.id,
message_id=getattr(message.raw_representation, "artifact_id", None),
additional_properties={"a2a_metadata": merged}
if (merged := {**message.additional_properties, **(task.metadata or {})})
else None,
raw_representation=task,
)
for message in task_messages
]
if task.artifacts is not None:
return []
return [AgentResponseUpdate(contents=[], role="assistant", response_id=task.id, raw_representation=task)]
return [
AgentResponseUpdate(
contents=[],
role="assistant",
response_id=task.id,
additional_properties={"a2a_metadata": task.metadata} if task.metadata else None,
raw_representation=task,
)
]

if background and status.state in IN_PROGRESS_TASK_STATES:
token = self._build_continuation_token(task)
Expand All @@ -468,6 +480,7 @@ def _updates_from_task(
role="assistant",
response_id=task.id,
continuation_token=token,
additional_properties={"a2a_metadata": task.metadata} if task.metadata else None,
raw_representation=task,
)
]
Expand All @@ -488,6 +501,7 @@ def _updates_from_task(
contents=contents,
role="assistant" if status.message.role == A2ARole.agent else "user",
response_id=task.id,
additional_properties={"a2a_metadata": task.metadata} if task.metadata else None,
raw_representation=task,
)
]
Expand All @@ -502,12 +516,17 @@ def _updates_from_task_update_event(
contents = self._parse_contents_from_a2a(update_event.artifact.parts)
if not contents:
return []
merged_metadata = {
**(update_event.artifact.metadata or {}),
**(update_event.metadata or {}),
} or None
return [
AgentResponseUpdate(
contents=contents,
role="assistant",
response_id=update_event.task_id,
message_id=update_event.artifact.artifact_id,
additional_properties={"a2a_metadata": merged_metadata} if merged_metadata else None,
raw_representation=update_event,
)
]
Expand All @@ -523,11 +542,16 @@ def _updates_from_task_update_event(
if not contents:
return []

merged_metadata = {
**(message.metadata or {}),
**(update_event.metadata or {}),
} or None
return [
AgentResponseUpdate(
contents=contents,
role="assistant" if message.role == A2ARole.agent else "user",
response_id=update_event.task_id,
additional_properties={"a2a_metadata": merged_metadata} if merged_metadata else None,
raw_representation=update_event,
)
]
Expand Down Expand Up @@ -642,9 +666,7 @@ def _prepare_message_for_a2a(self, message: Message) -> A2AMessage:
case _:
raise ValueError(f"Unknown content type: {content.type}")

# Exclude framework-internal keys (e.g. attribution) from wire metadata
internal_keys = {"_attribution", "context_id"}
metadata = {k: v for k, v in message.additional_properties.items() if k not in internal_keys} or None
metadata = message.additional_properties.get("a2a_metadata")

return A2AMessage(
role=A2ARole("user"),
Expand Down Expand Up @@ -718,6 +740,7 @@ def _parse_messages_from_task(self, task: Task) -> list[Message]:
Message(
role="assistant" if history_item.role == A2ARole.agent else "user",
contents=contents,
additional_properties=history_item.metadata,
raw_representation=history_item,
)
)
Expand All @@ -730,5 +753,6 @@ def _parse_message_from_artifact(self, artifact: Artifact) -> Message:
return Message(
role="assistant",
contents=contents,
additional_properties=artifact.metadata,
raw_representation=artifact,
)
209 changes: 208 additions & 1 deletion python/packages/a2a/tests/test_a2a_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def test_prepare_message_for_a2a_forwards_context_id() -> None:
message = Message(
role="user",
contents=[Content.from_text(text="Continue the task")],
additional_properties={"context_id": "ctx-123", "trace_id": "trace-456"},
additional_properties={"context_id": "ctx-123", "a2a_metadata": {"trace_id": "trace-456"}},
)

result = agent._prepare_message_for_a2a(message)
Expand Down Expand Up @@ -1385,3 +1385,210 @@ async def test_streaming_terminal_task_only_emits_unstreamed_artifacts(


# endregion

# region Metadata propagation tests


async def test_message_metadata_propagated(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""A2AMessage.metadata should appear on response.additional_properties."""
msg = A2AMessage(
message_id="msg-meta",
role=A2ARole.agent,
parts=[Part(root=TextPart(text="hi"))],
metadata={"source": "server", "trace_id": "abc"},
)
mock_a2a_client.responses.append(msg)

response = await a2a_agent.run("hello")
assert response.additional_properties["a2a_metadata"]["source"] == "server"
assert response.additional_properties["a2a_metadata"]["trace_id"] == "abc"


async def test_artifact_metadata_propagated(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""Artifact.metadata should appear on response.additional_properties."""
task = Task(
id="task-art-meta",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
artifacts=[
Artifact(
artifact_id="a1",
parts=[Part(root=TextPart(text="result"))],
metadata={"artifact_key": "artifact_value"},
),
],
)
mock_a2a_client.responses.append((task, None))

response = await a2a_agent.run("go")
assert response.additional_properties["a2a_metadata"]["artifact_key"] == "artifact_value"


async def test_task_metadata_propagated_to_response(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""Task.metadata should appear on response.additional_properties for terminal tasks."""
task = Task(
id="task-meta",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
artifacts=[
Artifact(artifact_id="a1", parts=[Part(root=TextPart(text="done"))]),
],
metadata={"task_key": "task_value"},
)
mock_a2a_client.responses.append((task, None))

response = await a2a_agent.run("go")
assert response.additional_properties["a2a_metadata"]["task_key"] == "task_value"


async def test_task_artifact_update_event_metadata_merged(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""TaskArtifactUpdateEvent and Artifact metadata should both appear on the streaming update."""
artifact_event = TaskArtifactUpdateEvent(
task_id="task-ae",
context_id="ctx",
artifact=Artifact(
artifact_id="a1",
parts=[Part(root=TextPart(text="chunk"))],
metadata={"from_artifact": True},
),
metadata={"from_event": True},
)
working_task = Task(
id="task-ae",
context_id="ctx",
status=TaskStatus(state=TaskState.working),
)
terminal_task = Task(
id="task-ae",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
artifacts=[
Artifact(artifact_id="a1", parts=[Part(root=TextPart(text="chunk"))]),
],
)
terminal_event = TaskStatusUpdateEvent(
task_id="task-ae",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
final=True,
)
mock_a2a_client.responses.extend([
(working_task, artifact_event),
(terminal_task, terminal_event),
])

stream = a2a_agent.run("hello", stream=True)
updates: list[AgentResponseUpdate] = []
async for update in stream:
updates.append(update)

artifact_update = updates[0]
assert artifact_update.additional_properties["a2a_metadata"]["from_artifact"] is True
assert artifact_update.additional_properties["a2a_metadata"]["from_event"] is True


async def test_task_status_update_event_metadata_merged(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""TaskStatusUpdateEvent and its message metadata should both appear on the streaming update."""
status_event = TaskStatusUpdateEvent(
task_id="task-se",
context_id="ctx",
status=TaskStatus(
state=TaskState.working,
message=A2AMessage(
message_id="m1",
role=A2ARole.agent,
parts=[Part(root=TextPart(text="working..."))],
metadata={"msg_key": "msg_val"},
),
),
final=False,
metadata={"event_key": "event_val"},
)
working_task = Task(
id="task-se",
context_id="ctx",
status=TaskStatus(state=TaskState.working),
)
terminal_task = Task(
id="task-se",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
artifacts=[
Artifact(artifact_id="a1", parts=[Part(root=TextPart(text="done"))]),
],
)
terminal_event = TaskStatusUpdateEvent(
task_id="task-se",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
final=True,
)
mock_a2a_client.responses.extend([
(working_task, status_event),
(terminal_task, terminal_event),
])

stream = a2a_agent.run("hello", stream=True)
updates: list[AgentResponseUpdate] = []
async for update in stream:
updates.append(update)

status_update = updates[0]
assert status_update.additional_properties["a2a_metadata"]["msg_key"] == "msg_val"
assert status_update.additional_properties["a2a_metadata"]["event_key"] == "event_val"


async def test_history_message_metadata_propagated(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""Metadata on a history Message should appear on response.additional_properties."""
task = Task(
id="task-hist",
context_id="ctx",
status=TaskStatus(state=TaskState.completed),
history=[
A2AMessage(
message_id="h1",
role=A2ARole.agent,
parts=[Part(root=TextPart(text="reply"))],
metadata={"history_key": "history_value"},
),
],
)
mock_a2a_client.responses.append((task, None))

response = await a2a_agent.run("go")
assert response.additional_properties["a2a_metadata"]["history_key"] == "history_value"


async def test_continuation_token_update_carries_task_metadata(
a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient
) -> None:
"""In-progress tasks with background=True should propagate task metadata."""
task = Task(
id="task-cont",
context_id="ctx",
status=TaskStatus(state=TaskState.working),
metadata={"bg_key": "bg_value"},
)
mock_a2a_client.responses.append((task, None))

response = await a2a_agent.run("go", background=True)
assert response.continuation_token is not None
assert response.additional_properties["a2a_metadata"]["bg_key"] == "bg_value"


async def test_none_metadata_leaves_additional_properties_empty(
a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient
) -> None:
"""When A2A types have no metadata, additional_properties should remain empty/default."""
msg = A2AMessage(
message_id="msg-none",
role=A2ARole.agent,
parts=[Part(root=TextPart(text="no meta"))],
)
mock_a2a_client.responses.append(msg)

response = await a2a_agent.run("hello")
assert not response.additional_properties


# endregion