Skip to content

Commit f69b572

Browse files
committed
fix: legacy A2A executor finalizes metadata-only responses as completed (#5188)
Three linked bugs in the legacy A2A executor finalization path: 1. Metadata-only/action-only final responses (working state, message with no parts) emitted final=True with status=working instead of completed. The else-branch now resolves working → completed. 2. Delta-style streamed text replaced the accumulated message on each event instead of concatenating TextPart content. TaskResultAggregator._accumulate_message now appends text chunks and merges metadata across successive working events. 3. The synthesized final artifact dropped response metadata from the accumulated message. Artifact now carries message.metadata.
1 parent b0715d7 commit f69b572

File tree

4 files changed

+355
-16
lines changed

4 files changed

+355
-16
lines changed

src/google/adk/a2a/executor/a2a_agent_executor.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,11 @@ async def _handle_request(
273273
await event_queue.enqueue_event(a2a_event)
274274

275275
# publish the task result event - this is final
276+
agg_message = task_result_aggregator.task_status_message
276277
if (
277278
task_result_aggregator.task_state == TaskState.working
278-
and task_result_aggregator.task_status_message is not None
279-
and task_result_aggregator.task_status_message.parts
279+
and agg_message is not None
280+
and agg_message.parts
280281
):
281282
# if task is still working properly, publish the artifact update event as
282283
# the final result according to a2a protocol.
@@ -287,7 +288,8 @@ async def _handle_request(
287288
context_id=context.context_id,
288289
artifact=Artifact(
289290
artifact_id=platform_uuid.new_uuid(),
290-
parts=task_result_aggregator.task_status_message.parts,
291+
parts=agg_message.parts,
292+
metadata=agg_message.metadata or None,
291293
),
292294
)
293295
)
@@ -304,14 +306,22 @@ async def _handle_request(
304306
final=True,
305307
)
306308
else:
309+
# Resolve terminal state: working → completed (agent finished
310+
# without error); other states (failed, auth_required, etc.)
311+
# are preserved as-is.
312+
final_state = (
313+
TaskState.completed
314+
if task_result_aggregator.task_state == TaskState.working
315+
else task_result_aggregator.task_state
316+
)
307317
final_event = TaskStatusUpdateEvent(
308318
task_id=context.task_id,
309319
status=TaskStatus(
310-
state=task_result_aggregator.task_state,
320+
state=final_state,
311321
timestamp=datetime.fromtimestamp(
312322
platform_time.get_time(), tz=timezone.utc
313323
).isoformat(),
314-
message=task_result_aggregator.task_status_message,
324+
message=agg_message,
315325
),
316326
context_id=context.context_id,
317327
final=True,

src/google/adk/a2a/executor/task_result_aggregator.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from a2a.types import Message
1919
from a2a.types import TaskState
2020
from a2a.types import TaskStatusUpdateEvent
21+
from a2a.types import TextPart
2122

2223
from ..experimental import a2a_experimental
2324

@@ -59,9 +60,50 @@ def process_event(self, event: Event):
5960
# always working because other state may terminate the event aggregation
6061
# in a2a request handler
6162
elif self._task_state == TaskState.working:
62-
self._task_status_message = event.status.message
63+
self._accumulate_message(event.status.message)
6364
event.status.state = TaskState.working
6465

66+
def _accumulate_message(self, new_message: Message | None):
67+
"""Accumulate content from a new message into the running result.
68+
69+
For delta-style streaming, successive TextPart texts are concatenated
70+
rather than replaced. Metadata dicts are merged (later values win).
71+
"""
72+
if new_message is None:
73+
return
74+
75+
if self._task_status_message is None:
76+
self._task_status_message = new_message
77+
return
78+
79+
# Accumulate parts
80+
if new_message.parts:
81+
if not self._task_status_message.parts:
82+
self._task_status_message.parts = list(new_message.parts)
83+
else:
84+
for new_part in new_message.parts:
85+
new_root = getattr(new_part, 'root', new_part)
86+
if isinstance(new_root, TextPart):
87+
# Concatenate into the last existing TextPart if one exists
88+
appended = False
89+
for existing_part in reversed(self._task_status_message.parts):
90+
existing_root = getattr(existing_part, 'root', existing_part)
91+
if isinstance(existing_root, TextPart):
92+
existing_root.text += new_root.text
93+
appended = True
94+
break
95+
if not appended:
96+
self._task_status_message.parts.append(new_part)
97+
else:
98+
self._task_status_message.parts.append(new_part)
99+
100+
# Merge metadata
101+
if new_message.metadata:
102+
if self._task_status_message.metadata is None:
103+
self._task_status_message.metadata = dict(new_message.metadata)
104+
else:
105+
self._task_status_message.metadata.update(new_message.metadata)
106+
65107
@property
66108
def task_state(self) -> TaskState:
67109
return self._task_state

tests/unittests/a2a/executor/test_a2a_agent_executor.py

Lines changed: 210 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from a2a.types import Part
2424
from a2a.types import Role
2525
from a2a.types import TaskState
26+
from a2a.types import TaskStatus
27+
from a2a.types import TaskStatusUpdateEvent
2628
from a2a.types import TextPart
2729
from google.adk.a2a.converters.request_converter import AgentRunRequest
2830
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
@@ -143,9 +145,9 @@ async def mock_run_async(**kwargs):
143145
final_event = self.mock_event_queue.enqueue_event.call_args_list[-1][0][0]
144146
assert final_event.final == True
145147
# The TaskResultAggregator is created with default state (working), and since no messages
146-
# are processed, it will publish a status event with the current state
148+
# are processed, the agent completed normally so the terminal state is completed
147149
assert hasattr(final_event.status, "message")
148-
assert final_event.status.state == TaskState.working
150+
assert final_event.status.state == TaskState.completed
149151

150152
@pytest.mark.asyncio
151153
async def test_execute_no_message_error(self):
@@ -218,9 +220,9 @@ async def mock_run_async(**kwargs):
218220
final_event = self.mock_event_queue.enqueue_event.call_args_list[-1][0][0]
219221
assert final_event.final == True
220222
# The TaskResultAggregator is created with default state (working), and since no messages
221-
# are processed, it will publish a status event with the current state
223+
# are processed, the agent completed normally so the terminal state is completed
222224
assert hasattr(final_event.status, "message")
223-
assert final_event.status.state == TaskState.working
225+
assert final_event.status.state == TaskState.completed
224226

225227
@pytest.mark.asyncio
226228
async def test_prepare_session_new_session(self):
@@ -443,9 +445,9 @@ async def mock_run_async(**kwargs):
443445
final_event = self.mock_event_queue.enqueue_event.call_args_list[-1][0][0]
444446
assert final_event.final == True
445447
# The TaskResultAggregator is created with default state (working), and since no messages
446-
# are processed, it will publish a status event with the current state
448+
# are processed, the agent completed normally so the terminal state is completed
447449
assert hasattr(final_event.status, "message")
448-
assert final_event.status.state == TaskState.working
450+
assert final_event.status.state == TaskState.completed
449451

450452
@pytest.mark.asyncio
451453
async def test_execute_with_async_callable_runner(self):
@@ -502,9 +504,9 @@ async def mock_run_async(**kwargs):
502504
final_event = self.mock_event_queue.enqueue_event.call_args_list[-1][0][0]
503505
assert final_event.final == True
504506
# The TaskResultAggregator is created with default state (working), and since no messages
505-
# are processed, it will publish a status event with the current state
507+
# are processed, the agent completed normally so the terminal state is completed
506508
assert hasattr(final_event.status, "message")
507-
assert final_event.status.state == TaskState.working
509+
assert final_event.status.state == TaskState.completed
508510

509511
@pytest.mark.asyncio
510512
async def test_handle_request_integration(self):
@@ -580,8 +582,8 @@ async def mock_run_async(**kwargs):
580582
assert len(final_events) >= 1
581583
final_event = final_events[-1] # Get the last final event
582584
assert final_event.status.message == mock_aggregator.task_status_message
583-
# When aggregator state is working but no message, final event should be working
584-
assert final_event.status.state == TaskState.working
585+
# When aggregator state is working but no message, final event should be completed
586+
assert final_event.status.state == TaskState.completed
585587

586588
@pytest.mark.asyncio
587589
async def test_cancel_with_task_id(self):
@@ -803,6 +805,7 @@ async def test_handle_request_with_working_state_publishes_artifact_and_complete
803805
test_message.message_id = "test-message-id"
804806
test_message.role = Role.agent
805807
test_message.parts = [Part(root=TextPart(text="test content"))]
808+
test_message.metadata = None
806809

807810
# Setup detailed mocks
808811
self.mock_request_converter.return_value = AgentRunRequest(
@@ -1072,3 +1075,200 @@ async def mock_run_async(**kwargs):
10721075
assert (
10731076
modified_a2a_event in enqueued_events
10741077
), "The modified event should have been enqueued"
1078+
1079+
# ── Regression tests for issue #5188 ──────────────────────────────────
1080+
1081+
@pytest.mark.asyncio
1082+
async def test_metadata_only_response_completes(self):
1083+
"""A response with metadata but no text parts should finalize as completed, not working."""
1084+
from a2a.types import TaskArtifactUpdateEvent
1085+
1086+
self.mock_context.task_id = "task-meta"
1087+
self.mock_context.context_id = "ctx-meta"
1088+
self.mock_context.current_task = Mock()
1089+
1090+
self.mock_request_converter.return_value = AgentRunRequest(
1091+
user_id="u",
1092+
session_id="s",
1093+
new_message=Mock(spec=Content),
1094+
run_config=Mock(spec=RunConfig),
1095+
)
1096+
1097+
mock_session = Mock()
1098+
mock_session.id = "s"
1099+
self.mock_runner.session_service.get_session = AsyncMock(
1100+
return_value=mock_session
1101+
)
1102+
self.mock_runner._new_invocation_context.return_value = Mock()
1103+
1104+
# The agent yields one event whose converted A2A event has a message
1105+
# with metadata but NO parts (metadata-only response).
1106+
meta_message = Message(
1107+
message_id="m1",
1108+
role=Role.agent,
1109+
parts=[],
1110+
metadata={"intent": "greeting"},
1111+
)
1112+
a2a_status_event = TaskStatusUpdateEvent(
1113+
task_id="task-meta",
1114+
context_id="ctx-meta",
1115+
status=TaskStatus(state=TaskState.working, message=meta_message),
1116+
final=False,
1117+
)
1118+
self.mock_event_converter.return_value = [a2a_status_event]
1119+
1120+
adk_event = Mock(spec=Event)
1121+
1122+
async def mock_run_async(**kwargs):
1123+
yield adk_event
1124+
1125+
self.mock_runner.run_async = mock_run_async
1126+
1127+
await self.executor.execute(self.mock_context, self.mock_event_queue)
1128+
1129+
enqueued = [
1130+
c[0][0] for c in self.mock_event_queue.enqueue_event.call_args_list
1131+
]
1132+
final_events = [
1133+
e for e in enqueued
1134+
if isinstance(e, TaskStatusUpdateEvent) and e.final
1135+
]
1136+
assert len(final_events) == 1
1137+
assert final_events[0].status.state == TaskState.completed
1138+
1139+
# No artifact event should be emitted (no parts to wrap).
1140+
artifact_events = [
1141+
e for e in enqueued if isinstance(e, TaskArtifactUpdateEvent)
1142+
]
1143+
assert len(artifact_events) == 0
1144+
1145+
@pytest.mark.asyncio
1146+
async def test_streamed_text_accumulated_in_final_artifact(self):
1147+
"""Delta text chunks should be concatenated in the synthesized final artifact."""
1148+
from a2a.types import TaskArtifactUpdateEvent
1149+
1150+
self.mock_context.task_id = "task-stream"
1151+
self.mock_context.context_id = "ctx-stream"
1152+
self.mock_context.current_task = Mock()
1153+
1154+
self.mock_request_converter.return_value = AgentRunRequest(
1155+
user_id="u",
1156+
session_id="s",
1157+
new_message=Mock(spec=Content),
1158+
run_config=Mock(spec=RunConfig),
1159+
)
1160+
mock_session = Mock()
1161+
mock_session.id = "s"
1162+
self.mock_runner.session_service.get_session = AsyncMock(
1163+
return_value=mock_session
1164+
)
1165+
self.mock_runner._new_invocation_context.return_value = Mock()
1166+
1167+
chunks = ["Hello", " world", "!"]
1168+
adk_events = [Mock(spec=Event) for _ in chunks]
1169+
1170+
# Each ADK event converts to a status update with one text chunk.
1171+
call_index = 0
1172+
1173+
def event_converter(adk_ev, inv_ctx, task_id, ctx_id, converter):
1174+
nonlocal call_index
1175+
text = chunks[call_index]
1176+
call_index += 1
1177+
return [
1178+
TaskStatusUpdateEvent(
1179+
task_id=task_id,
1180+
context_id=ctx_id,
1181+
status=TaskStatus(
1182+
state=TaskState.working,
1183+
message=Message(
1184+
message_id="m",
1185+
role=Role.agent,
1186+
parts=[Part(root=TextPart(text=text))],
1187+
),
1188+
),
1189+
final=False,
1190+
)
1191+
]
1192+
1193+
self.mock_event_converter.side_effect = event_converter
1194+
1195+
async def mock_run_async(**kwargs):
1196+
for ev in adk_events:
1197+
yield ev
1198+
1199+
self.mock_runner.run_async = mock_run_async
1200+
1201+
await self.executor.execute(self.mock_context, self.mock_event_queue)
1202+
1203+
enqueued = [
1204+
c[0][0] for c in self.mock_event_queue.enqueue_event.call_args_list
1205+
]
1206+
artifacts = [
1207+
e for e in enqueued if isinstance(e, TaskArtifactUpdateEvent)
1208+
]
1209+
assert len(artifacts) == 1
1210+
assert artifacts[0].artifact.parts[0].root.text == "Hello world!"
1211+
1212+
finals = [
1213+
e for e in enqueued
1214+
if isinstance(e, TaskStatusUpdateEvent) and e.final
1215+
]
1216+
assert len(finals) == 1
1217+
assert finals[0].status.state == TaskState.completed
1218+
1219+
@pytest.mark.asyncio
1220+
async def test_metadata_propagated_to_synthesized_artifact(self):
1221+
"""Message metadata should be carried into the synthesized Artifact."""
1222+
from a2a.types import TaskArtifactUpdateEvent
1223+
1224+
self.mock_context.task_id = "task-mp"
1225+
self.mock_context.context_id = "ctx-mp"
1226+
self.mock_context.current_task = Mock()
1227+
1228+
self.mock_request_converter.return_value = AgentRunRequest(
1229+
user_id="u",
1230+
session_id="s",
1231+
new_message=Mock(spec=Content),
1232+
run_config=Mock(spec=RunConfig),
1233+
)
1234+
mock_session = Mock()
1235+
mock_session.id = "s"
1236+
self.mock_runner.session_service.get_session = AsyncMock(
1237+
return_value=mock_session
1238+
)
1239+
self.mock_runner._new_invocation_context.return_value = Mock()
1240+
1241+
msg_with_meta = Message(
1242+
message_id="m1",
1243+
role=Role.agent,
1244+
parts=[Part(root=TextPart(text="result"))],
1245+
metadata={"source": "agent-x", "confidence": "0.95"},
1246+
)
1247+
a2a_event = TaskStatusUpdateEvent(
1248+
task_id="task-mp",
1249+
context_id="ctx-mp",
1250+
status=TaskStatus(state=TaskState.working, message=msg_with_meta),
1251+
final=False,
1252+
)
1253+
self.mock_event_converter.return_value = [a2a_event]
1254+
1255+
adk_event = Mock(spec=Event)
1256+
1257+
async def mock_run_async(**kwargs):
1258+
yield adk_event
1259+
1260+
self.mock_runner.run_async = mock_run_async
1261+
1262+
await self.executor.execute(self.mock_context, self.mock_event_queue)
1263+
1264+
enqueued = [
1265+
c[0][0] for c in self.mock_event_queue.enqueue_event.call_args_list
1266+
]
1267+
artifacts = [
1268+
e for e in enqueued if isinstance(e, TaskArtifactUpdateEvent)
1269+
]
1270+
assert len(artifacts) == 1
1271+
assert artifacts[0].artifact.metadata == {
1272+
"source": "agent-x",
1273+
"confidence": "0.95",
1274+
}

0 commit comments

Comments
 (0)