From c95f669bb6fbadbf07d62a8ff8a3e533e17032f4 Mon Sep 17 00:00:00 2001 From: Guillaume Laforge Date: Wed, 8 Apr 2026 11:48:53 +0200 Subject: [PATCH] fix: Fix A2A protocol chunk streaming and task completion states This PR fixes issues where A2A endpoints would prematurely truncate streams or drop events during conversational handoffs. * Stream Persistence: RemoteA2AAgent no longer aborts streams prematurely on HTTP endpoints. It now properly waits for the formal TaskState.COMPLETED status update. * Artifact Chunking: Fixed flag assignments in AgentExecutor so the first chunk reliably sends append=false and lastChunk cleanly mimics the generated partial state. * Protocol Alignment: Removed the Message payload in the final success event to perfectly reflect the Go A2A reference implementation. --- .../google/adk/a2a/agent/RemoteA2AAgent.java | 6 +- .../adk/a2a/executor/AgentExecutor.java | 6 +- .../adk/a2a/executor/AgentExecutorTest.java | 66 +++++++++++++++++++ 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/a2a/src/main/java/com/google/adk/a2a/agent/RemoteA2AAgent.java b/a2a/src/main/java/com/google/adk/a2a/agent/RemoteA2AAgent.java index 1366f10b2..d4e094710 100644 --- a/a2a/src/main/java/com/google/adk/a2a/agent/RemoteA2AAgent.java +++ b/a2a/src/main/java/com/google/adk/a2a/agent/RemoteA2AAgent.java @@ -328,9 +328,9 @@ synchronized void handleEvent(ClientEvent clientEvent, AgentCard unused) { emitter.onNext(event); }); - // For non-streaming communication, complete the flow; for streaming, wait until the client - // marks the completion. - if (isCompleted(clientEvent) || !streaming) { + // Wait until the client receives a status payload marking the completion of the task + // regardless of the underlying streaming or non-streaming protocol configuration. + if (isCompleted(clientEvent)) { // Only complete the flow once. if (!done) { emitAggregatedEventAndClearBuffer(clientEvent); diff --git a/a2a/src/main/java/com/google/adk/a2a/executor/AgentExecutor.java b/a2a/src/main/java/com/google/adk/a2a/executor/AgentExecutor.java index 7252cdec1..77a0d62d7 100644 --- a/a2a/src/main/java/com/google/adk/a2a/executor/AgentExecutor.java +++ b/a2a/src/main/java/com/google/adk/a2a/executor/AgentExecutor.java @@ -299,6 +299,7 @@ private static class EventProcessor { private final String runArtifactId; private final AgentExecutorConfig.OutputMode outputMode; private final Map lastAgentPartialArtifact = new ConcurrentHashMap<>(); + private boolean isFirstEventForRun = true; // All artifacts related to the invocation should have the same artifact id. private EventProcessor(AgentExecutorConfig.OutputMode outputMode) { @@ -329,8 +330,9 @@ private Maybe process( } } - Boolean append = true; - Boolean lastChunk = false; + boolean append = !isFirstEventForRun; + isFirstEventForRun = false; + boolean lastChunk = !event.partial().orElse(false); String artifactId = runArtifactId; if (outputMode == AgentExecutorConfig.OutputMode.ARTIFACT_PER_EVENT) { diff --git a/a2a/src/test/java/com/google/adk/a2a/executor/AgentExecutorTest.java b/a2a/src/test/java/com/google/adk/a2a/executor/AgentExecutorTest.java index 647aaf21f..02998063d 100644 --- a/a2a/src/test/java/com/google/adk/a2a/executor/AgentExecutorTest.java +++ b/a2a/src/test/java/com/google/adk/a2a/executor/AgentExecutorTest.java @@ -439,6 +439,72 @@ public void process_statefulAggregation_tracksArtifactIdAndAppendForAuthor() { assertThat(ev3.getArtifact().artifactId()).isEqualTo(firstArtifactId); } + @Test + public void execute_withDefaultArtifactPerRun_emitsMessageAndLastChunk() { + Event partialEvent = + Event.builder() + .partial(true) + .author("agent") + .content( + Content.builder() + .parts(ImmutableList.of(Part.builder().text("chunk1").build())) + .build()) + .build(); + Event finalEvent = + Event.builder() + .partial(false) + .author("agent") + .content( + Content.builder() + .parts(ImmutableList.of(Part.builder().text("chunk1chunk2").build())) + .build()) + .build(); + + testAgent.setEventsToEmit(Flowable.just(partialEvent, finalEvent)); + AgentExecutor executor = + new AgentExecutor.Builder() + .app(App.builder().name("test_app").rootAgent(testAgent).build()) + .sessionService(new InMemorySessionService()) + .artifactService(new InMemoryArtifactService()) + .agentExecutorConfig( + AgentExecutorConfig.builder() + .outputMode(AgentExecutorConfig.OutputMode.ARTIFACT_PER_RUN) + .build()) + .build(); + + RequestContext requestContext = createRequestContext(); + executor.execute(requestContext, eventQueue); + + // Verify events were correctly formed. + ImmutableList artifactEvents = + enqueuedEvents.stream() + .filter(e -> e instanceof TaskArtifactUpdateEvent) + .map(e -> (TaskArtifactUpdateEvent) e) + .collect(toImmutableList()); + + assertThat(artifactEvents).hasSize(2); + // Partial event has lastChunk = false + assertThat(artifactEvents.get(0).isLastChunk()).isFalse(); + // Final event has lastChunk = true + assertThat(artifactEvents.get(1).isLastChunk()).isTrue(); + + // First chunk appends=false, subsequent chunks append=true + assertThat(artifactEvents.get(0).isAppend()).isFalse(); + assertThat(artifactEvents.get(1).isAppend()).isTrue(); + + // Now verify the final TaskStatusUpdateEvent has a null message as expected + Optional statusEvent = + enqueuedEvents.stream() + .filter(e -> e instanceof TaskStatusUpdateEvent) + .map(e -> (TaskStatusUpdateEvent) e) + .filter(TaskStatusUpdateEvent::isFinal) + .findFirst(); + + assertThat(statusEvent).isPresent(); + Message finalMessage = statusEvent.get().getStatus().message(); + assertThat(finalMessage).isNull(); + } + private static final class TestAgent extends BaseAgent { private Flowable eventsToEmit;