Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions src/google/adk/a2a/converters/to_adk_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,17 @@ def _create_event(
actions: Optional[EventActions] = None,
long_running_function_ids: Optional[set[str]] = None,
partial: bool = False,
event_metadata: Optional[dict[str, Any]] = None,
) -> Optional[Event]:
"""Creates an ADK event from parts and metadata."""
event_actions = actions or EventActions()
if not output_parts and not event_actions.model_dump(
exclude_none=True, exclude_defaults=True
):
has_actions = bool(
event_actions.model_dump(exclude_none=True, exclude_defaults=True)
)
if not output_parts and not has_actions and not event_metadata:
return None

event = Event(
event_kwargs: dict[str, Any] = dict(
invocation_id=(
invocation_context.invocation_id
if invocation_context
Expand All @@ -208,7 +210,10 @@ def _create_event(
partial=partial,
)

return event
if event_metadata:
event_kwargs.update(event_metadata)

return Event(**event_kwargs)


def _parse_adk_metadata_value(value: Any) -> Any:
Expand Down Expand Up @@ -248,6 +253,74 @@ def _extract_event_actions(
return EventActions()


def _extract_event_metadata(
metadata: Optional[dict[str, Any]],
) -> dict[str, Any]:
"""Extracts ADK event metadata fields from A2A metadata.

Restores fields like custom_metadata, usage_metadata, error_code, and
grounding_metadata that were serialized into A2A metadata by the outbound
converter.

Args:
metadata: The A2A metadata dictionary.

Returns:
A dictionary of event keyword arguments to apply to the Event.
"""
if not metadata:
return {}

event_kwargs: dict[str, Any] = {}

# Simple fields that can be restored directly
simple_fields = ["error_code"]
for field_name in simple_fields:
raw_value = metadata.get(_get_adk_metadata_key(field_name))
if raw_value is not None:
event_kwargs[field_name] = _parse_adk_metadata_value(raw_value)

# Dict fields that need parsing
dict_fields = ["custom_metadata"]
for field_name in dict_fields:
raw_value = metadata.get(_get_adk_metadata_key(field_name))
if raw_value is not None:
parsed = _parse_adk_metadata_value(raw_value)
if isinstance(parsed, dict):
event_kwargs[field_name] = parsed
else:
logger.warning(
"Ignoring invalid ADK %s metadata of type %s",
field_name,
type(parsed).__name__,
)

# Pydantic model fields that need validation
model_fields = {
"usage_metadata": genai_types.GenerateContentResponseUsageMetadata,
"grounding_metadata": genai_types.GroundingMetadata,
}
for field_name, model_class in model_fields.items():
raw_value = metadata.get(_get_adk_metadata_key(field_name))
if raw_value is not None:
parsed = _parse_adk_metadata_value(raw_value)
if isinstance(parsed, dict):
try:
event_kwargs[field_name] = model_class(**parsed)
except (ValidationError, TypeError) as error:
logger.warning(
"Ignoring invalid ADK %s metadata: %s", field_name, error
)
else:
logger.warning(
"Ignoring invalid ADK %s metadata of type %s",
field_name,
type(parsed).__name__,
)

return event_kwargs


def _merge_top_level_dicts(
base: dict[str, Any], new_values: dict[str, Any]
) -> dict[str, Any]:
Expand Down Expand Up @@ -304,6 +377,7 @@ def convert_a2a_task_to_event(

try:
event_actions = EventActions()
event_metadata: dict[str, Any] = {}
output_parts = []
long_running_function_ids = set()
if a2a_task.artifacts:
Expand All @@ -314,6 +388,7 @@ def convert_a2a_task_to_event(
event_actions = _merge_event_actions(
event_actions, _extract_event_actions(artifact.metadata)
)
event_metadata.update(_extract_event_metadata(artifact.metadata))
output_parts, _ = _convert_a2a_parts_to_adk_parts(
artifact_parts, part_converter
)
Expand All @@ -325,6 +400,9 @@ def convert_a2a_task_to_event(
event_actions,
_extract_event_actions(a2a_task.status.message.metadata),
)
event_metadata.update(
_extract_event_metadata(a2a_task.status.message.metadata)
)
parts, ids = _convert_a2a_parts_to_adk_parts(
a2a_task.status.message.parts, part_converter
)
Expand All @@ -337,6 +415,7 @@ def convert_a2a_task_to_event(
author,
event_actions,
long_running_function_ids,
event_metadata=event_metadata or None,
)

except Exception as e:
Expand Down Expand Up @@ -380,6 +459,7 @@ def convert_a2a_message_to_event(
invocation_context,
author,
_extract_event_actions(a2a_message.metadata),
event_metadata=_extract_event_metadata(a2a_message.metadata),
)

except Exception as e:
Expand Down Expand Up @@ -412,10 +492,15 @@ def convert_a2a_status_update_to_event(
output_parts = []
long_running_function_ids = set()
event_actions = EventActions()
event_metadata: Optional[dict[str, Any]] = None
if a2a_status_update.status.message:
event_actions = _extract_event_actions(
a2a_status_update.status.message.metadata
)
event_metadata = (
_extract_event_metadata(a2a_status_update.status.message.metadata)
or None
)
parts, ids = _convert_a2a_parts_to_adk_parts(
a2a_status_update.status.message.parts, part_converter
)
Expand All @@ -428,6 +513,7 @@ def convert_a2a_status_update_to_event(
author,
event_actions,
long_running_function_ids,
event_metadata=event_metadata,
)
except Exception as e:
logger.error("Failed to convert A2A status update to event: %s", e)
Expand Down Expand Up @@ -466,6 +552,10 @@ def convert_a2a_artifact_update_to_event(
author,
_extract_event_actions(a2a_artifact_update.artifact.metadata),
partial=not a2a_artifact_update.last_chunk,
event_metadata=_extract_event_metadata(
a2a_artifact_update.artifact.metadata
)
or None,
)
except Exception as e:
logger.error("Failed to convert A2A artifact update to event: %s", e)
Expand Down
154 changes: 154 additions & 0 deletions tests/unittests/a2a/converters/test_to_adk.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,157 @@ def test_convert_a2a_artifact_update_to_event_none(self):
"""Test convert_a2a_artifact_update_to_event with None."""
with pytest.raises(ValueError, match="A2A artifact update cannot be None"):
convert_a2a_artifact_update_to_event(None)

def test_convert_a2a_message_to_event_restores_custom_metadata(self):
"""Test A2A message conversion restores custom_metadata."""
a2a_part = Mock(spec=A2APart)
a2a_part.root = Mock(spec=TextPart)
a2a_part.root.metadata = {}
message = Message(
message_id="msg-1",
role="user",
parts=[a2a_part],
metadata={
_get_adk_metadata_key("custom_metadata"): {
"my_key": "my_value",
}
},
)

mock_genai_part = genai_types.Part.from_text(text="hello")
mock_part_converter = Mock(return_value=[mock_genai_part])

event = convert_a2a_message_to_event(
message,
author="test-author",
invocation_context=self.mock_context,
part_converter=mock_part_converter,
)

assert event is not None
assert event.custom_metadata == {"my_key": "my_value"}

def test_convert_a2a_message_to_event_restores_error_code(self):
"""Test A2A message conversion restores error_code."""
a2a_part = Mock(spec=A2APart)
a2a_part.root = Mock(spec=TextPart)
a2a_part.root.metadata = {}
message = Message(
message_id="msg-1",
role="user",
parts=[a2a_part],
metadata={
_get_adk_metadata_key("error_code"): "RESOURCE_EXHAUSTED",
},
)

mock_genai_part = genai_types.Part.from_text(text="error")
mock_part_converter = Mock(return_value=[mock_genai_part])

event = convert_a2a_message_to_event(
message,
author="test-author",
invocation_context=self.mock_context,
part_converter=mock_part_converter,
)

assert event is not None
assert event.error_code == "RESOURCE_EXHAUSTED"

def test_convert_a2a_message_to_event_metadata_only_returns_event(self):
"""Test A2A message with only metadata (no parts/actions) returns event."""
message = Message(
message_id="msg-1",
role="user",
parts=[],
metadata={
_get_adk_metadata_key("custom_metadata"): {
"reason": "metadata-only",
}
},
)

event = convert_a2a_message_to_event(
message,
author="test-author",
invocation_context=self.mock_context,
part_converter=Mock(),
)

assert event is not None
assert event.custom_metadata == {"reason": "metadata-only"}
assert event.content is None

def test_convert_a2a_status_update_restores_custom_metadata(self):
"""Test A2A status update conversion restores custom_metadata."""
a2a_part = Mock(spec=A2APart)
a2a_part.root = Mock(spec=TextPart)
a2a_part.root.metadata = {}
update = TaskStatusUpdateEvent(
task_id="task-1",
status=TaskStatus(
state=TaskState.input_required,
timestamp="now",
message=Message(
message_id="m1",
role="agent",
parts=[a2a_part],
metadata={
_get_adk_metadata_key("custom_metadata"): {
"trace_id": "abc-123",
}
},
),
),
context_id="context-1",
final=False,
)

mock_genai_part = genai_types.Part.from_text(text="status text")
mock_part_converter = Mock(return_value=[mock_genai_part])

event = convert_a2a_status_update_to_event(
update,
author="test-author",
invocation_context=self.mock_context,
part_converter=mock_part_converter,
)

assert event is not None
assert event.custom_metadata == {"trace_id": "abc-123"}

def test_convert_a2a_task_restores_custom_metadata_from_artifact(self):
"""Test A2A task conversion restores custom_metadata from artifact."""
task = Task(
id="task-1",
status=TaskStatus(
state=TaskState.submitted, timestamp="2024-01-01T00:00:00Z"
),
context_id="context-1",
artifacts=[
Artifact(
artifact_id="art-1",
artifact_type="message",
parts=[],
metadata={
_get_adk_metadata_key("custom_metadata"): {
"task_key": "task_value",
},
_get_adk_metadata_key("actions"): {
"stateDelta": {"saved_key": "saved-value"}
},
},
)
],
)

event = convert_a2a_task_to_event(
task,
author="test-author",
invocation_context=self.mock_context,
part_converter=Mock(),
)

assert event is not None
assert event.custom_metadata == {"task_key": "task_value"}
assert event.actions.state_delta == {"saved_key": "saved-value"}
Loading