Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ synchronized void handleEvent(ClientEvent clientEvent, AgentCard unused) {
emitter.onNext(event);
});

// For non-streaming communication, complete the flow; for streaming, wait until the client
// marks the completion.
if (isCompleted(clientEvent) || !streaming) {
// Wait until the client receives a status payload marking the completion of the task
// regardless of the underlying streaming or non-streaming protocol configuration.
if (isCompleted(clientEvent)) {
// Only complete the flow once.
if (!done) {
emitAggregatedEventAndClearBuffer(clientEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ private static class EventProcessor {
private final String runArtifactId;
private final AgentExecutorConfig.OutputMode outputMode;
private final Map<String, String> lastAgentPartialArtifact = new ConcurrentHashMap<>();
private boolean isFirstEventForRun = true;

// All artifacts related to the invocation should have the same artifact id.
private EventProcessor(AgentExecutorConfig.OutputMode outputMode) {
Expand Down Expand Up @@ -329,8 +330,9 @@ private Maybe<TaskArtifactUpdateEvent> process(
}
}

Boolean append = true;
Boolean lastChunk = false;
boolean append = !isFirstEventForRun;
isFirstEventForRun = false;
boolean lastChunk = !event.partial().orElse(false);
String artifactId = runArtifactId;

if (outputMode == AgentExecutorConfig.OutputMode.ARTIFACT_PER_EVENT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,72 @@ public void process_statefulAggregation_tracksArtifactIdAndAppendForAuthor() {
assertThat(ev3.getArtifact().artifactId()).isEqualTo(firstArtifactId);
}

@Test
public void execute_withDefaultArtifactPerRun_emitsMessageAndLastChunk() {
Event partialEvent =
Event.builder()
.partial(true)
.author("agent")
.content(
Content.builder()
.parts(ImmutableList.of(Part.builder().text("chunk1").build()))
.build())
.build();
Event finalEvent =
Event.builder()
.partial(false)
.author("agent")
.content(
Content.builder()
.parts(ImmutableList.of(Part.builder().text("chunk1chunk2").build()))
.build())
.build();

testAgent.setEventsToEmit(Flowable.just(partialEvent, finalEvent));
AgentExecutor executor =
new AgentExecutor.Builder()
.app(App.builder().name("test_app").rootAgent(testAgent).build())
.sessionService(new InMemorySessionService())
.artifactService(new InMemoryArtifactService())
.agentExecutorConfig(
AgentExecutorConfig.builder()
.outputMode(AgentExecutorConfig.OutputMode.ARTIFACT_PER_RUN)
.build())
.build();

RequestContext requestContext = createRequestContext();
executor.execute(requestContext, eventQueue);

// Verify events were correctly formed.
ImmutableList<TaskArtifactUpdateEvent> artifactEvents =
enqueuedEvents.stream()
.filter(e -> e instanceof TaskArtifactUpdateEvent)
.map(e -> (TaskArtifactUpdateEvent) e)
.collect(toImmutableList());

assertThat(artifactEvents).hasSize(2);
// Partial event has lastChunk = false
assertThat(artifactEvents.get(0).isLastChunk()).isFalse();
// Final event has lastChunk = true
assertThat(artifactEvents.get(1).isLastChunk()).isTrue();

// First chunk appends=false, subsequent chunks append=true
assertThat(artifactEvents.get(0).isAppend()).isFalse();
assertThat(artifactEvents.get(1).isAppend()).isTrue();

// Now verify the final TaskStatusUpdateEvent has a null message as expected
Optional<TaskStatusUpdateEvent> statusEvent =
enqueuedEvents.stream()
.filter(e -> e instanceof TaskStatusUpdateEvent)
.map(e -> (TaskStatusUpdateEvent) e)
.filter(TaskStatusUpdateEvent::isFinal)
.findFirst();

assertThat(statusEvent).isPresent();
Message finalMessage = statusEvent.get().getStatus().message();
assertThat(finalMessage).isNull();
}

private static final class TestAgent extends BaseAgent {
private Flowable<Event> eventsToEmit;

Expand Down