From c6a610bf06dfd3dcf8e167bffd4184d800495246 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Tue, 5 May 2026 22:39:26 +0800 Subject: [PATCH 1/6] refactor(agui): redesign event dispatching with strategy pattern and deferred queue --- .../agentscope/core/message/ContentBlock.java | 7 +- .../agentscope/core/message/CustomBlock.java | 116 +++++ .../core/agui/adapter/AguiAgentAdapter.java | 468 +++--------------- .../core/agui/adapter/StreamContext.java | 191 +++++++ .../adapter/strategy/BlockEventConverter.java | 45 ++ .../strategy/CustomBlockConverter.java | 43 ++ .../adapter/strategy/TextBlockConverter.java | 56 +++ .../strategy/ThinkingBlockConverter.java | 61 +++ .../strategy/ToolResultBlockConverter.java | 92 ++++ .../strategy/ToolUseBlockConverter.java | 61 +++ .../agui/adapter/AguiAgentAdapterTest.java | 30 +- 11 files changed, 769 insertions(+), 401 deletions(-) create mode 100644 agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java create mode 100644 agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java diff --git a/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java b/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java index 55afe0393..d9b9ac10a 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java +++ b/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java @@ -35,6 +35,7 @@ *
  • {@link VideoBlock} - Video content (URL or Base64) *
  • {@link ToolUseBlock} - Tool execution requests *
  • {@link ToolResultBlock} - Tool execution results + *
  • {@link CustomBlock} - Custom extension block to trigger AG-UI Custom events * * *

    Uses Jackson annotations for polymorphic JSON serialization with the "type" discriminator @@ -49,7 +50,8 @@ @JsonSubTypes.Type(value = AudioBlock.class, name = "audio"), @JsonSubTypes.Type(value = VideoBlock.class, name = "video"), @JsonSubTypes.Type(value = ToolUseBlock.class, name = "tool_use"), - @JsonSubTypes.Type(value = ToolResultBlock.class, name = "tool_result") + @JsonSubTypes.Type(value = ToolResultBlock.class, name = "tool_result"), + @JsonSubTypes.Type(value = CustomBlock.class, name = "custom") }) public sealed class ContentBlock implements State permits TextBlock, @@ -58,4 +60,5 @@ public sealed class ContentBlock implements State VideoBlock, ThinkingBlock, ToolUseBlock, - ToolResultBlock {} + ToolResultBlock, + CustomBlock {} diff --git a/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java new file mode 100644 index 000000000..c0658e5d8 --- /dev/null +++ b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java @@ -0,0 +1,116 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.message; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents a custom extension block specifically designed to trigger AG-UI Custom events. + * + *

    This block allows for the transmission of arbitrary JSON-serializable payloads + * to the frontend. It is commonly used for real-time progress pushes, workflow + * state changes, and any other customized interactive behaviors within the AG-UI protocol. + * + *

    The name identifies the custom event type, and the value can be a Map, String, + * Number, or any JSON-serializable object containing the event payload. + */ +public final class CustomBlock extends ContentBlock { + + private final String name; + private final Object value; + + /** + * Creates a new custom block for JSON deserialization. + * + * @param name The name of the custom event + * @param value The arbitrary payload value associated with the event + */ + @JsonCreator + public CustomBlock( + @JsonProperty("name") String name, + @JsonProperty("value") Object value) { + this.name = name; + this.value = value; + } + + /** + * Gets the name of this custom block. + * + * @return The event name + */ + public String getName() { + return name; + } + + /** + * Gets the payload value of this custom block. + * + * @return The payload object + */ + public Object getValue() { + return value; + } + + /** + * Creates a new builder for constructing CustomBlock instances. + * + * @return A new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for constructing CustomBlock instances. + */ + public static class Builder { + + private String name; + private Object value; + + /** + * Sets the name for the custom block event. + * + * @param name The custom event name + * @return This builder for chaining + */ + public Builder name(String name) { + this.name = name; + return this; + } + + /** + * Sets the arbitrary payload value for the block. + * + * @param value payload object + * @return This builder for chaining + */ + public Builder value(Object value) { + this.value = value; + return this; + } + + /** + * Builds a new CustomBlock with the configured properties. + * + * @return A new CustomBlock instance + */ + public CustomBlock build() { + return new CustomBlock(name, value); + } + } +} \ No newline at end of file diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java index 2314e72b9..f0e88b033 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java @@ -20,24 +20,28 @@ import io.agentscope.core.agent.EventType; import io.agentscope.core.agent.StreamOptions; import io.agentscope.core.agui.converter.AguiMessageConverter; +import io.agentscope.core.agui.adapter.strategy.BlockEventConverter; +import io.agentscope.core.agui.adapter.strategy.CustomBlockConverter; +import io.agentscope.core.agui.adapter.strategy.TextBlockConverter; +import io.agentscope.core.agui.adapter.strategy.ThinkingBlockConverter; +import io.agentscope.core.agui.adapter.strategy.ToolResultBlockConverter; +import io.agentscope.core.agui.adapter.strategy.ToolUseBlockConverter; import io.agentscope.core.agui.event.AguiEvent; import io.agentscope.core.agui.model.RunAgentInput; import io.agentscope.core.message.ContentBlock; +import io.agentscope.core.message.CustomBlock; import io.agentscope.core.message.Msg; import io.agentscope.core.message.TextBlock; import io.agentscope.core.message.ThinkingBlock; import io.agentscope.core.message.ToolResultBlock; import io.agentscope.core.message.ToolUseBlock; -import io.agentscope.core.util.JsonException; -import io.agentscope.core.util.JsonUtils; +import reactor.core.publisher.Flux; + import java.util.ArrayList; -import java.util.LinkedHashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import reactor.core.publisher.Flux; /** * Adapter that bridges AgentScope agents to the AG-UI protocol. @@ -48,15 +52,15 @@ *

    Event Mapping: *

    * *

    Reasoning Support: *

    @@ -66,17 +70,25 @@ public class AguiAgentAdapter { private final Agent agent; private final AguiAdapterConfig config; private final AguiMessageConverter messageConverter; + private final Map, BlockEventConverter> converters = new HashMap<>(); /** - * Creates a new AguiAgentAdapter. + * Creates a new AguiAgentAdapter and registers all block conversion strategies. * - * @param agent The agent to adapt + * @param agent The agent to adapt * @param config The adapter configuration */ public AguiAgentAdapter(Agent agent, AguiAdapterConfig config) { - this.agent = Objects.requireNonNull(agent, "agent cannot be null"); - this.config = Objects.requireNonNull(config, "config cannot be null"); + this.agent = Objects.requireNonNull(agent); + this.config = Objects.requireNonNull(config); this.messageConverter = new AguiMessageConverter(); + + // Register block conversion strategies + converters.put(TextBlock.class, new TextBlockConverter()); + converters.put(ThinkingBlock.class, new ThinkingBlockConverter()); + converters.put(ToolUseBlock.class, new ToolUseBlockConverter()); + converters.put(ToolResultBlock.class, new ToolResultBlockConverter()); + converters.put(CustomBlock.class, new CustomBlockConverter()); } /** @@ -89,399 +101,73 @@ public AguiAgentAdapter(Agent agent, AguiAdapterConfig config) { * @return A Flux of AG-UI events */ public Flux run(RunAgentInput input) { - return Flux.defer( - () -> { - String threadId = input.getThreadId(); - String runId = input.getRunId(); - - // Convert AG-UI messages to AgentScope messages - List msgs = messageConverter.toMsgList(input.getMessages()); - - // Create stream options - use incremental mode for true streaming - StreamOptions options = - StreamOptions.builder() - .eventTypes(EventType.ALL) - .incremental(true) - .build(); - - // Track state for event conversion - EventConversionState state = new EventConversionState(threadId, runId); - - return Flux.concat( - // Emit RUN_STARTED - Flux.just(new AguiEvent.RunStarted(threadId, runId)), - // Stream agent events and convert to AG-UI events - // Use concatMapIterable to preserve strict event ordering - agent.stream(msgs, options) - .concatMapIterable(event -> convertEvent(event, state)), - // Emit any pending end events and RUN_FINISHED - Flux.defer(() -> finishRun(state))) - .onErrorResume( - error -> { - // On error, emit RawEvent with error info followed by - // RunFinished - String errorMessage = - error.getMessage() != null - ? error.getMessage() - : error.getClass().getSimpleName(); - return Flux.just( - new AguiEvent.Raw( - threadId, - runId, - Map.of("error", errorMessage)), - new AguiEvent.RunFinished(threadId, runId)); - }); - }); + String threadId = input.getThreadId(); + String runId = input.getRunId(); + List msgs = messageConverter.toMsgList(input.getMessages()); + // Create stream options - use incremental mode for true streaming + StreamOptions options = StreamOptions.builder().eventTypes(EventType.ALL).incremental(true).build(); + + return Flux.defer(() -> { + StreamContext ctx = new StreamContext(threadId, runId, config); + + return Flux.concat( + // Emit RUN_STARTED + Flux.just(new AguiEvent.RunStarted(threadId, runId)), + + // Stream agent events and convert to AG-UI events + // Use concatMapIterable to preserve strict event ordering + agent.stream(msgs, options) + .concatMapIterable(event -> processEvent(event, ctx)), + + // Emit any pending end events + Flux.defer(() -> Flux.fromIterable(ctx.flushAllRemainingDeferred())), + + // Emit RUN_FINISHED + Flux.just(new AguiEvent.RunFinished(threadId, runId)) + ).onErrorResume(error -> handleError(threadId, runId, ctx, error)); + }); } /** - * Convert an AgentScope event to AG-UI events. + * Dispatches the incoming event to the appropriate converter strategies based on block types. * - * @param event The AgentScope event - * @param state The conversion state - * @return List of AG-UI events + * @param event The incoming agent event + * @param ctx The current stream context + * @return A list of AG-UI events generated during this processing cycle */ - private List convertEvent(Event event, EventConversionState state) { - List events = new ArrayList<>(); - Msg msg = event.getMessage(); - EventType type = event.getType(); - - if (type == EventType.REASONING || type == EventType.SUMMARY) { - // Handle reasoning/summary events - convert to text messages and tool calls - for (ContentBlock block : msg.getContent()) { - if (block instanceof TextBlock textBlock) { - String text = textBlock.getText(); - if (text != null && !text.isEmpty()) { - String messageId = msg.getId(); - - // Start message if not started - if (!state.hasStartedMessage(messageId)) { - events.add( - new AguiEvent.TextMessageStart( - state.threadId, state.runId, messageId, "assistant")); - state.startMessage(messageId); - } - - if (!event.isLast()) { - // In incremental mode, text is already the delta - events.add( - new AguiEvent.TextMessageContent( - state.threadId, state.runId, messageId, text)); - } else { - // End message if this is the last event - if (!state.hasEndedMessage(messageId)) { - events.add( - new AguiEvent.TextMessageEnd( - state.threadId, state.runId, messageId)); - state.endMessage(messageId); - } - } - } - } else if (block instanceof ThinkingBlock thinkingBlock) { - // Handle thinking blocks - convert to REASONING_* events (only if enabled) - // According to AG-UI Reasoning draft: https://docs.ag-ui.com/drafts/reasoning - if (config.isEnableReasoning()) { - String thinking = thinkingBlock.getThinking(); - if (thinking != null && !thinking.isEmpty()) { - String messageId = msg.getId(); - - // Start reasoning message if not started - if (!state.hasStartedReasoningMessage(messageId)) { - events.add( - new AguiEvent.ReasoningMessageStart( - state.threadId, - state.runId, - messageId, - "reasoning")); - state.startReasoningMessage(messageId); - } - - if (!event.isLast()) { - // In incremental mode, thinking is already the delta - events.add( - new AguiEvent.ReasoningMessageContent( - state.threadId, state.runId, messageId, thinking)); - } else { - // End reasoning message if this is the last event - events.add( - new AguiEvent.ReasoningMessageEnd( - state.threadId, state.runId, messageId)); - state.endReasoningMessage(messageId); - } - } - } - // If reasoning is disabled, ThinkingBlock content is ignored (backward - // compatibility) - } else if (block instanceof ToolUseBlock toolUse) { - // End any active text message before starting tool call - if (state.hasActiveTextMessage()) { - String activeMessageId = state.getCurrentTextMessageId(); - events.add( - new AguiEvent.TextMessageEnd( - state.threadId, state.runId, activeMessageId)); - state.endMessage(activeMessageId); - } - - // End any active reasoning message before starting tool call - if (state.hasActiveReasoningMessage()) { - String activeReasoningMessageId = state.getCurrentReasoningMessageId(); - events.add( - new AguiEvent.ReasoningMessageEnd( - state.threadId, state.runId, activeReasoningMessageId)); - state.endReasoningMessage(activeReasoningMessageId); - } - - // Emit tool call start - String toolCallId = toolUse.getId(); - if (toolCallId == null) { - toolCallId = UUID.randomUUID().toString(); - } - - if (!state.hasStartedToolCall(toolCallId)) { - events.add( - new AguiEvent.ToolCallStart( - state.threadId, - state.runId, - toolCallId, - toolUse.getName())); - state.startToolCall(toolCallId); - } - - // Emit tool call args if enabled - if (config.isEmitToolCallArgs() && !event.isLast()) { - String args = toolUse.getContent(); - if (args != null && !args.isEmpty()) { - events.add( - new AguiEvent.ToolCallArgs( - state.threadId, state.runId, toolCallId, args)); - } - } - } - } - } else if (type == EventType.TOOL_RESULT && event.isLast()) { - // Handle tool results - for (ContentBlock block : msg.getContent()) { - if (block instanceof ToolResultBlock toolResult) { - String toolCallId = toolResult.getId(); - if (toolCallId == null) { - toolCallId = UUID.randomUUID().toString(); - } - - String result = extractToolResultText(toolResult); - - boolean hasStarted = state.hasStartedToolCall(toolCallId); - if (!hasStarted) { - String toolName = toolResult.getName(); - if (toolName == null || toolName.isBlank()) { - toolName = "unknown"; - } - events.add( - new AguiEvent.ToolCallStart( - state.threadId, state.runId, toolCallId, toolName)); - state.startToolCall(toolCallId); - } - - // Ensure ToolCallEnd is emitted to close arguments phase - events.add(new AguiEvent.ToolCallEnd(state.threadId, state.runId, toolCallId)); - - events.add( - new AguiEvent.ToolCallResult( - state.threadId, - state.runId, - toolCallId, - result, - "tool", - msg.getId())); - state.endToolCall(toolCallId); - } + @SuppressWarnings("unchecked") + private List processEvent(Event event, StreamContext ctx) { + // Dispatch each content block to its corresponding converter + for (ContentBlock block : event.getMessage().getContent()) { + BlockEventConverter converter = + (BlockEventConverter) converters.get(block.getClass()); + + if (converter != null && converter.isApplicable(event)) { + converter.convert(block, event, ctx); } } - return events; + return ctx.getAndClearEmittedEvents(); } /** - * Finish the run by emitting any pending end events and RUN_FINISHED. + * Handles errors that occur during the stream pipeline. + * Guarantees that all deferred end events are flushed before the error event is emitted. * - * @param state The conversion state - * @return Flux of final events + * @param threadId The thread ID + * @param runId The run ID + * @param ctx The current stream context + * @param error The thrown exception + * @return A Flux containing the fallback closure events */ - private Flux finishRun(EventConversionState state) { + private Flux handleError(String threadId, String runId, StreamContext ctx, Throwable error) { List events = new ArrayList<>(); + events.addAll(ctx.flushAllRemainingDeferred()); - // End any messages that weren't properly ended - for (String messageId : state.getStartedMessages()) { - if (!state.hasEndedMessage(messageId)) { - events.add(new AguiEvent.TextMessageEnd(state.threadId, state.runId, messageId)); - } - } - - // End any tool calls that weren't properly ended - for (String toolCallId : state.getStartedToolCalls()) { - if (!state.hasEndedToolCall(toolCallId)) { - events.add(new AguiEvent.ToolCallEnd(state.threadId, state.runId, toolCallId)); - } - } - - // End any reasoning messages that weren't properly ended - for (String messageId : state.getStartedReasoningMessages()) { - if (!state.hasEndedReasoningMessage(messageId)) { - events.add( - new AguiEvent.ReasoningMessageEnd(state.threadId, state.runId, messageId)); - } - } - - // Emit RUN_FINISHED - events.add(new AguiEvent.RunFinished(state.threadId, state.runId)); + String msg = error.getMessage() != null ? error.getMessage() : error.getClass().getSimpleName(); + events.add(new AguiEvent.Raw(threadId, runId, Map.of("error", msg))); + events.add(new AguiEvent.RunFinished(threadId, runId)); return Flux.fromIterable(events); } - - /** - * Extract text content from a tool result block. - * - * @param toolResult The tool result block - * @return The text content, or null if not present - */ - private String extractToolResultText(ToolResultBlock toolResult) { - if (toolResult.getOutput() == null || toolResult.getOutput().isEmpty()) { - return null; - } - - StringBuilder sb = new StringBuilder(); - for (ContentBlock output : toolResult.getOutput()) { - if (output instanceof TextBlock textBlock) { - if (!sb.isEmpty()) { - sb.append("\n"); - } - sb.append(textBlock.getText()); - } - } - - return !sb.isEmpty() ? sb.toString() : null; - } - - /** - * Serialize tool arguments to JSON string. - * - * @param input The tool input map - * @return JSON string representation - */ - private String serializeToolArgs(Map input) { - if (input == null || input.isEmpty()) { - return "{}"; - } - try { - return JsonUtils.getJsonCodec().toJson(input); - } catch (JsonException e) { - return "{}"; - } - } - - /** - * State tracker for event conversion. - * Uses LinkedHashSet to preserve insertion order for proper event sequencing. - */ - private static class EventConversionState { - final String threadId; - final String runId; - private final Set startedMessages = new LinkedHashSet<>(); - private final Set endedMessages = new LinkedHashSet<>(); - private final Set startedToolCalls = new LinkedHashSet<>(); - private final Set endedToolCalls = new LinkedHashSet<>(); - private final Set startedReasoningMessages = new LinkedHashSet<>(); - private final Set endedReasoningMessages = new LinkedHashSet<>(); - private String currentTextMessageId = null; - private String currentReasoningMessageId = null; - - EventConversionState(String threadId, String runId) { - this.threadId = threadId; - this.runId = runId; - } - - boolean hasStartedMessage(String messageId) { - return startedMessages.contains(messageId); - } - - void startMessage(String messageId) { - startedMessages.add(messageId); - currentTextMessageId = messageId; - } - - void endMessage(String messageId) { - endedMessages.add(messageId); - if (Objects.equals(messageId, currentTextMessageId)) { - currentTextMessageId = null; - } - } - - boolean hasEndedMessage(String messageId) { - return endedMessages.contains(messageId); - } - - String getCurrentTextMessageId() { - return currentTextMessageId; - } - - boolean hasActiveTextMessage() { - return currentTextMessageId != null && !hasEndedMessage(currentTextMessageId); - } - - Set getStartedMessages() { - return startedMessages; - } - - boolean hasStartedToolCall(String toolCallId) { - return startedToolCalls.contains(toolCallId); - } - - void startToolCall(String toolCallId) { - startedToolCalls.add(toolCallId); - } - - void endToolCall(String toolCallId) { - endedToolCalls.add(toolCallId); - } - - boolean hasEndedToolCall(String toolCallId) { - return endedToolCalls.contains(toolCallId); - } - - Set getStartedToolCalls() { - return startedToolCalls; - } - - boolean hasStartedReasoningMessage(String messageId) { - return startedReasoningMessages.contains(messageId); - } - - void startReasoningMessage(String messageId) { - startedReasoningMessages.add(messageId); - currentReasoningMessageId = messageId; - } - - void endReasoningMessage(String messageId) { - endedReasoningMessages.add(messageId); - if (Objects.equals(messageId, currentReasoningMessageId)) { - currentReasoningMessageId = null; - } - } - - boolean hasEndedReasoningMessage(String messageId) { - return endedReasoningMessages.contains(messageId); - } - - String getCurrentReasoningMessageId() { - return currentReasoningMessageId; - } - - boolean hasActiveReasoningMessage() { - return currentReasoningMessageId != null - && !hasEndedReasoningMessage(currentReasoningMessageId); - } - - Set getStartedReasoningMessages() { - return startedReasoningMessages; - } - } -} +} \ No newline at end of file diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java new file mode 100644 index 000000000..7974886c8 --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java @@ -0,0 +1,191 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter; + +import io.agentscope.core.agui.event.AguiEvent; +import java.util.*; + +/** + * Context holder for the AG-UI stream pipeline. + * Manages the emission of immediate events and the queuing of deferred end events + * to ensure strict adherence to the AG-UI lifecycle protocol. + */ +public class StreamContext { + // Prefixes used to prevent key collisions in the deferred events map, + // as different blocks (e.g., Text and Reasoning) may share the same message ID. + public static final String PREFIX_TEXT = "text:"; + public static final String PREFIX_REASONING = "reasoning:"; + public static final String PREFIX_TOOL = "tool:"; + + private final String threadId; + private final String runId; + private final AguiAdapterConfig config; + + // Events ready to be emitted in the current processing cycle + private final List emittedEvents = new ArrayList<>(); + + // Deferred event queue for storing pending end events (Key: prefix + id) + private final Map deferredEndEvents = new LinkedHashMap<>(); + + private final Set activeTextIds = new LinkedHashSet<>(); + private final Set activeReasoningIds = new LinkedHashSet<>(); + private final Set activeToolIds = new LinkedHashSet<>(); + + // Fallback ID for tool results that might lack an explicit ID + private String lastActiveToolId = null; + + /** + * Initializes a new StreamContext. + * + * @param threadId The thread ID + * @param runId The run ID + * @param config The adapter configuration + */ + public StreamContext(String threadId, String runId, AguiAdapterConfig config) { + this.threadId = threadId; + this.runId = runId; + this.config = config; + } + + public String getThreadId() { return threadId; } + public String getRunId() { return runId; } + public AguiAdapterConfig getConfig() { return config; } + + // --- Event Emission and Deferred Management API --- + + /** + * Adds an event to the emission queue for the current processing cycle. + * + * @param event The AG-UI event to emit + */ + public void emit(AguiEvent event) { + this.emittedEvents.add(event); + } + + /** + * Registers an end event to be emitted later. + * This decouples the start logic from the termination logic. + * + * @param id The prefixed identifier for the component + * @param endEvent The end event to defer + */ + public void deferEndEvent(String id, AguiEvent endEvent) { + this.deferredEndEvents.put(id, endEvent); + } + + /** + * Retrieves and clears all events accumulated in the current cycle. + * + * @return A list of events to be dispatched downstream + */ + public List getAndClearEmittedEvents() { + List result = new ArrayList<>(emittedEvents); + emittedEvents.clear(); + return result; + } + + /** + * Flushes a specific deferred end event into the emission queue. + * + * @param id The prefixed identifier of the event to flush + */ + public void flushEndEvent(String id) { + AguiEvent endEvent = deferredEndEvents.remove(id); + if (endEvent != null) { + emit(endEvent); + } + } + + /** + * Flushes all remaining deferred end events. + * Typically called during stream termination or error recovery to ensure all UI components are closed. + * + * @return A list of all remaining deferred events + */ + public List flushAllRemainingDeferred() { + List remaining = new ArrayList<>(deferredEndEvents.values()); + + deferredEndEvents.clear(); + activeTextIds.clear(); + activeReasoningIds.clear(); + activeToolIds.clear(); + + return remaining; + } + + /** + * Flushes all active text end events. + * Commonly used when an interruption occurs (e.g., a tool call starts). + */ + public void flushAllActiveTexts() { + for (String id : new ArrayList<>(activeTextIds)) { + flushEndEvent(PREFIX_TEXT + id); + removeActiveText(id); + } + } + + /** + * Flushes all active reasoning end events. + * Commonly used when an interruption occurs (e.g., a tool call starts). + */ + public void flushAllActiveReasonings() { + for (String id : new ArrayList<>(activeReasoningIds)) { + flushEndEvent(PREFIX_REASONING + id); + removeActiveReasoning(id); + } + } + + // --- Text State Management --- + + public boolean isTextActive(String id) { return activeTextIds.contains(id); } + public void addActiveText(String id) { activeTextIds.add(id); } + public void removeActiveText(String id) { activeTextIds.remove(id); } + + // --- Reasoning State Management --- + + public boolean isReasoningActive(String id) { return activeReasoningIds.contains(id); } + public void addActiveReasoning(String id) { activeReasoningIds.add(id); } + public void removeActiveReasoning(String id) { activeReasoningIds.remove(id); } + + // --- Tool State Management --- + + public boolean isToolActive(String id) { + return activeToolIds.contains(id); + } + + public void addActiveTool(String id) { + this.activeToolIds.add(id); + this.lastActiveToolId = id; // Update the fallback ID + } + + public void removeActiveTool(String id) { + this.activeToolIds.remove(id); + // If the removed ID matches the last recorded fallback ID, reset or step back the pointer + if (Objects.equals(this.lastActiveToolId, id)) { + if (activeToolIds.isEmpty()) { + this.lastActiveToolId = null; + } else { + // Retrieve the last inserted element from the LinkedHashSet + String[] array = activeToolIds.toArray(new String[0]); + this.lastActiveToolId = array[array.length - 1]; + } + } + } + + public String getLastActiveToolId() { + return this.lastActiveToolId; + } +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java new file mode 100644 index 000000000..67ed91d1b --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter.strategy; + +import io.agentscope.core.agent.Event; +import io.agentscope.core.agui.adapter.StreamContext; +import io.agentscope.core.message.ContentBlock; + +/** + * Strategy interface for converting different types of ContentBlock into AG-UI events. + * + * @param The specific type of ContentBlock this converter handles. + */ +public interface BlockEventConverter { + + /** + * Determines whether the current Event is applicable for this converter. + * + * @param event The agent stream event to evaluate. + * @return true if the event can be processed by this converter, false otherwise. + */ + boolean isApplicable(Event event); + + /** + * Executes the conversion logic, generating and appending AG-UI events to the stream context. + * + * @param block The content block to convert. + * @param event The original agent event. + * @param context The stream context holding state and emitted events. + */ + void convert(T block, Event event, StreamContext context); +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java new file mode 100644 index 000000000..803e5318f --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter.strategy; + +import io.agentscope.core.agent.Event; +import io.agentscope.core.agui.adapter.StreamContext; +import io.agentscope.core.agui.event.AguiEvent; +import io.agentscope.core.message.CustomBlock; + +/** + * Converter for handling CustomBlock events, transforming them into AG-UI Custom events. + */ +public class CustomBlockConverter implements BlockEventConverter { + + @Override + public boolean isApplicable(Event event) { + // Custom events can occur at any stage, so always return true. + return true; + } + + @Override + public void convert(CustomBlock block, Event event, StreamContext ctx) { + ctx.emit(new AguiEvent.Custom( + ctx.getThreadId(), + ctx.getRunId(), + block.getName(), + block.getValue() + )); + } +} \ No newline at end of file diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java new file mode 100644 index 000000000..d6920efa2 --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter.strategy; + +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agui.adapter.StreamContext; +import io.agentscope.core.agui.event.AguiEvent; +import io.agentscope.core.message.TextBlock; + +/** + * Converter for handling TextBlock events, transforming them into AG-UI TextMessage events. + */ +public class TextBlockConverter implements BlockEventConverter { + + @Override + public boolean isApplicable(Event event) { + return event.getType() == EventType.REASONING || event.getType() == EventType.SUMMARY; + } + + @Override + public void convert(TextBlock block, Event event, StreamContext ctx) { + String text = block.getText(); + String msgId = event.getMessage().getId(); + + if (text != null && !text.isEmpty()) { + if (!ctx.isTextActive(msgId)) { + ctx.flushAllActiveTexts(); + + ctx.emit(new AguiEvent.TextMessageStart(ctx.getThreadId(), ctx.getRunId(), msgId, "assistant")); + ctx.deferEndEvent(StreamContext.PREFIX_TEXT + msgId, new AguiEvent.TextMessageEnd(ctx.getThreadId(), ctx.getRunId(), msgId)); + ctx.addActiveText(msgId); + } + + ctx.emit(new AguiEvent.TextMessageContent(ctx.getThreadId(), ctx.getRunId(), msgId, text)); + } + + if (event.isLast() && ctx.isTextActive(msgId)) { + ctx.flushEndEvent(StreamContext.PREFIX_TEXT + msgId); + ctx.removeActiveText(msgId); + } + } +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java new file mode 100644 index 000000000..1770be741 --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter.strategy; + +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agui.adapter.StreamContext; +import io.agentscope.core.agui.event.AguiEvent; +import io.agentscope.core.message.ThinkingBlock; + +/** + * Converter for handling ThinkingBlock events, transforming them into AG-UI ReasoningMessage events. + */ +public class ThinkingBlockConverter implements BlockEventConverter { + + @Override + public boolean isApplicable(Event event) { + return event.getType() == EventType.REASONING || event.getType() == EventType.SUMMARY; + } + + @Override + public void convert(ThinkingBlock block, Event event, StreamContext ctx) { + // ignore if reasoning output is disabled + if (!ctx.getConfig().isEnableReasoning()) { + return; + } + + String thinking = block.getThinking(); + String msgId = event.getMessage().getId(); + + if (thinking != null && !thinking.isEmpty()) { + if (!ctx.isReasoningActive(msgId)) { + ctx.flushAllActiveReasonings(); + + ctx.emit(new AguiEvent.ReasoningMessageStart(ctx.getThreadId(), ctx.getRunId(), msgId, "reasoning")); + ctx.deferEndEvent(StreamContext.PREFIX_REASONING + msgId, new AguiEvent.ReasoningMessageEnd(ctx.getThreadId(), ctx.getRunId(), msgId)); + ctx.addActiveReasoning(msgId); + } + + ctx.emit(new AguiEvent.ReasoningMessageContent(ctx.getThreadId(), ctx.getRunId(), msgId, thinking)); + } + + if (event.isLast() && ctx.isReasoningActive(msgId)) { + ctx.flushEndEvent(StreamContext.PREFIX_REASONING + msgId); + ctx.removeActiveReasoning(msgId); + } + } +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java new file mode 100644 index 000000000..627653914 --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java @@ -0,0 +1,92 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter.strategy; + +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agui.adapter.StreamContext; +import io.agentscope.core.agui.event.AguiEvent; +import io.agentscope.core.message.ContentBlock; +import io.agentscope.core.message.TextBlock; +import io.agentscope.core.message.ToolResultBlock; + +import java.util.UUID; + +/** + * Converter for handling ToolResultBlock events, transforming them into AG-UI ToolCallResult events. + */ +public class ToolResultBlockConverter implements BlockEventConverter { + + @Override + public boolean isApplicable(Event event) { + return event.getType() == EventType.TOOL_RESULT && event.isLast(); + } + + @Override + public void convert(ToolResultBlock block, Event event, StreamContext ctx) { + String toolCallId = block.getId() != null ? block.getId() : + (ctx.getLastActiveToolId() != null ? ctx.getLastActiveToolId() : UUID.randomUUID().toString()); + String result = extractToolResultText(block); + + // Closing Start/End Phase + if (ctx.isToolActive(toolCallId)) { + ctx.flushEndEvent(StreamContext.PREFIX_TOOL + toolCallId); + } else { + // Fall-back: The previous process did not proceed to Start for some reason + // (e.g., recovery directly from the context) + String toolName = block.getName() != null && !block.getName().isBlank() ? block.getName() : "unknown"; + ctx.emit(new AguiEvent.ToolCallStart(ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); + ctx.emit(new AguiEvent.ToolCallEnd(ctx.getThreadId(), ctx.getRunId(), toolCallId)); + } + + ctx.emit(new AguiEvent.ToolCallResult( + ctx.getThreadId(), + ctx.getRunId(), + toolCallId, + result != null ? result : "", + "tool", + event.getMessage().getId() + )); + + if (ctx.isToolActive(toolCallId)) { + ctx.removeActiveTool(toolCallId); + } + } + + /** + * Extract text content from a tool result block. + * + * @param toolResult The tool result block + * @return The text content, or null if not present + */ + private String extractToolResultText(ToolResultBlock toolResult) { + if (toolResult.getOutput() == null || toolResult.getOutput().isEmpty()) { + return null; + } + + StringBuilder sb = new StringBuilder(); + for (ContentBlock output : toolResult.getOutput()) { + if (output instanceof TextBlock textBlock) { + if (!sb.isEmpty()) { + sb.append("\n"); + } + sb.append(textBlock.getText()); + } + } + + return !sb.isEmpty() ? sb.toString() : null; + } +} \ No newline at end of file diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java new file mode 100644 index 000000000..76a024989 --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agui.adapter.strategy; + +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agui.adapter.StreamContext; +import io.agentscope.core.agui.event.AguiEvent; +import io.agentscope.core.message.ToolUseBlock; + +import java.util.UUID; + +/** + * Converter for handling ToolUseBlock events, transforming them into AG-UI ToolCallStart and ToolCallArgs events. + */ +public class ToolUseBlockConverter implements BlockEventConverter { + + @Override + public boolean isApplicable(Event event) { + return event.getType() == EventType.REASONING || event.getType() == EventType.SUMMARY; + } + + @Override + public void convert(ToolUseBlock block, Event event, StreamContext ctx) { + String toolCallId = block.getId() != null ? block.getId() : + (ctx.getLastActiveToolId() != null ? ctx.getLastActiveToolId() : UUID.randomUUID().toString()); + + if (!ctx.isToolActive(toolCallId)) { + // End any active Text/Reasoning message before starting tool call + ctx.flushAllActiveTexts(); + ctx.flushAllActiveReasonings(); + + String toolName = block.getName() != null ? block.getName() : "unknown"; + ctx.emit(new AguiEvent.ToolCallStart(ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); + ctx.deferEndEvent(StreamContext.PREFIX_TOOL + toolCallId, + new AguiEvent.ToolCallEnd(ctx.getThreadId(), ctx.getRunId(), toolCallId)); + ctx.addActiveTool(toolCallId); + } + + // Emit tool call args if enabled + if (ctx.getConfig().isEmitToolCallArgs() && !event.isLast()) { + String args = block.getContent(); + if (args != null && !args.isEmpty()) { + ctx.emit(new AguiEvent.ToolCallArgs(ctx.getThreadId(), ctx.getRunId(), toolCallId, args)); + } + } + } +} \ No newline at end of file diff --git a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java index e1ac649a6..0fc708270 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java @@ -269,7 +269,7 @@ void testRunWithStreamingSummaryEvents() { long contentCount = events.stream().filter(e -> e instanceof AguiEvent.TextMessageContent).count(); - assertEquals(2, contentCount, "Should stream summary chunks as text deltas"); + assertEquals(3, contentCount, "Should stream summary chunks as text deltas"); long startCount = events.stream().filter(e -> e instanceof AguiEvent.TextMessageStart).count(); @@ -654,9 +654,10 @@ void testToolUseBlockWithNullId() { } @Test - void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() { - // Test that when a text message is interrupted by a tool call and then the last event - // contains text blocks with the same message ID, only one TextMessageEnd is emitted + void testTextMessageLifecycleWhenInterruptedByToolCall() { + // Test that when a text message is interrupted by a tool call, the active text message + // is closed immediately. When subsequent text blocks with the same message ID arrive, + // a new text message lifecycle (Start -> Content -> End) is initiated. String msgId = "msg-text"; Msg firstMsg = Msg.builder() @@ -700,7 +701,7 @@ void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() { assertNotNull(events); - // Should have exactly one TextMessageEnd for the same message ID + // Verify that TextMessageEnd is emitted twice due to the tool call interruption long textEndCount = events.stream() .filter(e -> e instanceof AguiEvent.TextMessageEnd) @@ -710,7 +711,19 @@ void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() { return msgId.equals(end.messageId()); }) .count(); - assertEquals(1, textEndCount, "Should have exactly 1 TextMessageEnd per message ID"); + assertEquals(2, textEndCount, "Should emit exactly 2 TextMessageEnds: one for the interrupt, one for the final chunk"); + + // Optional: You can also verify that it was started twice for the same reason + long textStartCount = + events.stream() + .filter(e -> e instanceof AguiEvent.TextMessageStart) + .filter( + e -> { + AguiEvent.TextMessageStart start = (AguiEvent.TextMessageStart) e; + return msgId.equals(start.messageId()); + }) + .count(); + assertEquals(2, textStartCount, "Should emit exactly 2 TextMessageStarts due to the tool call interruption"); } @Test @@ -799,8 +812,9 @@ void testTextMessageEndWithLastEventDirectly() { // Verify the event sequence assertInstanceOf(AguiEvent.RunStarted.class, events.get(0)); assertInstanceOf(AguiEvent.TextMessageStart.class, events.get(1)); - assertInstanceOf(AguiEvent.TextMessageEnd.class, events.get(2)); - assertInstanceOf(AguiEvent.RunFinished.class, events.get(3)); + assertInstanceOf(AguiEvent.TextMessageContent.class, events.get(2)); + assertInstanceOf(AguiEvent.TextMessageEnd.class, events.get(3)); + assertInstanceOf(AguiEvent.RunFinished.class, events.get(4)); } @Test From c8ece673723fc5aa64e05aa91a46050067456736 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Wed, 6 May 2026 11:26:00 +0800 Subject: [PATCH 2/6] fix: format code style --- .../agentscope/core/message/CustomBlock.java | 6 +- .../core/agui/adapter/AguiAgentAdapter.java | 60 ++++++++++--------- .../core/agui/adapter/StreamContext.java | 50 ++++++++++++---- .../strategy/CustomBlockConverter.java | 11 ++-- .../adapter/strategy/TextBlockConverter.java | 12 +++- .../strategy/ThinkingBlockConverter.java | 13 +++- .../strategy/ToolResultBlockConverter.java | 36 ++++++----- .../strategy/ToolUseBlockConverter.java | 22 ++++--- .../agui/adapter/AguiAgentAdapterTest.java | 14 ++++- 9 files changed, 146 insertions(+), 78 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java index c0658e5d8..6dfc5ccf8 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java +++ b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java @@ -40,9 +40,7 @@ public final class CustomBlock extends ContentBlock { * @param value The arbitrary payload value associated with the event */ @JsonCreator - public CustomBlock( - @JsonProperty("name") String name, - @JsonProperty("value") Object value) { + public CustomBlock(@JsonProperty("name") String name, @JsonProperty("value") Object value) { this.name = name; this.value = value; } @@ -113,4 +111,4 @@ public CustomBlock build() { return new CustomBlock(name, value); } } -} \ No newline at end of file +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java index f0e88b033..b7d18f588 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java @@ -19,13 +19,13 @@ import io.agentscope.core.agent.Event; import io.agentscope.core.agent.EventType; import io.agentscope.core.agent.StreamOptions; -import io.agentscope.core.agui.converter.AguiMessageConverter; import io.agentscope.core.agui.adapter.strategy.BlockEventConverter; import io.agentscope.core.agui.adapter.strategy.CustomBlockConverter; import io.agentscope.core.agui.adapter.strategy.TextBlockConverter; import io.agentscope.core.agui.adapter.strategy.ThinkingBlockConverter; import io.agentscope.core.agui.adapter.strategy.ToolResultBlockConverter; import io.agentscope.core.agui.adapter.strategy.ToolUseBlockConverter; +import io.agentscope.core.agui.converter.AguiMessageConverter; import io.agentscope.core.agui.event.AguiEvent; import io.agentscope.core.agui.model.RunAgentInput; import io.agentscope.core.message.ContentBlock; @@ -35,13 +35,12 @@ import io.agentscope.core.message.ThinkingBlock; import io.agentscope.core.message.ToolResultBlock; import io.agentscope.core.message.ToolUseBlock; -import reactor.core.publisher.Flux; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import reactor.core.publisher.Flux; /** * Adapter that bridges AgentScope agents to the AG-UI protocol. @@ -105,27 +104,32 @@ public Flux run(RunAgentInput input) { String runId = input.getRunId(); List msgs = messageConverter.toMsgList(input.getMessages()); // Create stream options - use incremental mode for true streaming - StreamOptions options = StreamOptions.builder().eventTypes(EventType.ALL).incremental(true).build(); - - return Flux.defer(() -> { - StreamContext ctx = new StreamContext(threadId, runId, config); - - return Flux.concat( - // Emit RUN_STARTED - Flux.just(new AguiEvent.RunStarted(threadId, runId)), - - // Stream agent events and convert to AG-UI events - // Use concatMapIterable to preserve strict event ordering - agent.stream(msgs, options) - .concatMapIterable(event -> processEvent(event, ctx)), - - // Emit any pending end events - Flux.defer(() -> Flux.fromIterable(ctx.flushAllRemainingDeferred())), - - // Emit RUN_FINISHED - Flux.just(new AguiEvent.RunFinished(threadId, runId)) - ).onErrorResume(error -> handleError(threadId, runId, ctx, error)); - }); + StreamOptions options = + StreamOptions.builder().eventTypes(EventType.ALL).incremental(true).build(); + + return Flux.defer( + () -> { + StreamContext ctx = new StreamContext(threadId, runId, config); + + return Flux.concat( + // Emit RUN_STARTED + Flux.just(new AguiEvent.RunStarted(threadId, runId)), + + // Stream agent events and convert to AG-UI events + // Use concatMapIterable to preserve strict event ordering + agent.stream(msgs, options) + .concatMapIterable(event -> processEvent(event, ctx)), + + // Emit any pending end events + Flux.defer( + () -> + Flux.fromIterable( + ctx.flushAllRemainingDeferred())), + + // Emit RUN_FINISHED + Flux.just(new AguiEvent.RunFinished(threadId, runId))) + .onErrorResume(error -> handleError(threadId, runId, ctx, error)); + }); } /** @@ -160,14 +164,16 @@ private List processEvent(Event event, StreamContext ctx) { * @param error The thrown exception * @return A Flux containing the fallback closure events */ - private Flux handleError(String threadId, String runId, StreamContext ctx, Throwable error) { + private Flux handleError( + String threadId, String runId, StreamContext ctx, Throwable error) { List events = new ArrayList<>(); events.addAll(ctx.flushAllRemainingDeferred()); - String msg = error.getMessage() != null ? error.getMessage() : error.getClass().getSimpleName(); + String msg = + error.getMessage() != null ? error.getMessage() : error.getClass().getSimpleName(); events.add(new AguiEvent.Raw(threadId, runId, Map.of("error", msg))); events.add(new AguiEvent.RunFinished(threadId, runId)); return Flux.fromIterable(events); } -} \ No newline at end of file +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java index 7974886c8..482586a5c 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java @@ -16,7 +16,13 @@ package io.agentscope.core.agui.adapter; import io.agentscope.core.agui.event.AguiEvent; -import java.util.*; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; /** * Context holder for the AG-UI stream pipeline. @@ -60,9 +66,17 @@ public StreamContext(String threadId, String runId, AguiAdapterConfig config) { this.config = config; } - public String getThreadId() { return threadId; } - public String getRunId() { return runId; } - public AguiAdapterConfig getConfig() { return config; } + public String getThreadId() { + return threadId; + } + + public String getRunId() { + return runId; + } + + public AguiAdapterConfig getConfig() { + return config; + } // --- Event Emission and Deferred Management API --- @@ -150,15 +164,31 @@ public void flushAllActiveReasonings() { // --- Text State Management --- - public boolean isTextActive(String id) { return activeTextIds.contains(id); } - public void addActiveText(String id) { activeTextIds.add(id); } - public void removeActiveText(String id) { activeTextIds.remove(id); } + public boolean isTextActive(String id) { + return activeTextIds.contains(id); + } + + public void addActiveText(String id) { + activeTextIds.add(id); + } + + public void removeActiveText(String id) { + activeTextIds.remove(id); + } // --- Reasoning State Management --- - public boolean isReasoningActive(String id) { return activeReasoningIds.contains(id); } - public void addActiveReasoning(String id) { activeReasoningIds.add(id); } - public void removeActiveReasoning(String id) { activeReasoningIds.remove(id); } + public boolean isReasoningActive(String id) { + return activeReasoningIds.contains(id); + } + + public void addActiveReasoning(String id) { + activeReasoningIds.add(id); + } + + public void removeActiveReasoning(String id) { + activeReasoningIds.remove(id); + } // --- Tool State Management --- diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java index 803e5318f..27e784476 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java @@ -33,11 +33,8 @@ public boolean isApplicable(Event event) { @Override public void convert(CustomBlock block, Event event, StreamContext ctx) { - ctx.emit(new AguiEvent.Custom( - ctx.getThreadId(), - ctx.getRunId(), - block.getName(), - block.getValue() - )); + ctx.emit( + new AguiEvent.Custom( + ctx.getThreadId(), ctx.getRunId(), block.getName(), block.getValue())); } -} \ No newline at end of file +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java index d6920efa2..707803de5 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java @@ -40,12 +40,18 @@ public void convert(TextBlock block, Event event, StreamContext ctx) { if (!ctx.isTextActive(msgId)) { ctx.flushAllActiveTexts(); - ctx.emit(new AguiEvent.TextMessageStart(ctx.getThreadId(), ctx.getRunId(), msgId, "assistant")); - ctx.deferEndEvent(StreamContext.PREFIX_TEXT + msgId, new AguiEvent.TextMessageEnd(ctx.getThreadId(), ctx.getRunId(), msgId)); + ctx.emit( + new AguiEvent.TextMessageStart( + ctx.getThreadId(), ctx.getRunId(), msgId, "assistant")); + ctx.deferEndEvent( + StreamContext.PREFIX_TEXT + msgId, + new AguiEvent.TextMessageEnd(ctx.getThreadId(), ctx.getRunId(), msgId)); ctx.addActiveText(msgId); } - ctx.emit(new AguiEvent.TextMessageContent(ctx.getThreadId(), ctx.getRunId(), msgId, text)); + ctx.emit( + new AguiEvent.TextMessageContent( + ctx.getThreadId(), ctx.getRunId(), msgId, text)); } if (event.isLast() && ctx.isTextActive(msgId)) { diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java index 1770be741..6fa61b9d7 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java @@ -45,12 +45,19 @@ public void convert(ThinkingBlock block, Event event, StreamContext ctx) { if (!ctx.isReasoningActive(msgId)) { ctx.flushAllActiveReasonings(); - ctx.emit(new AguiEvent.ReasoningMessageStart(ctx.getThreadId(), ctx.getRunId(), msgId, "reasoning")); - ctx.deferEndEvent(StreamContext.PREFIX_REASONING + msgId, new AguiEvent.ReasoningMessageEnd(ctx.getThreadId(), ctx.getRunId(), msgId)); + ctx.emit( + new AguiEvent.ReasoningMessageStart( + ctx.getThreadId(), ctx.getRunId(), msgId, "reasoning")); + ctx.deferEndEvent( + StreamContext.PREFIX_REASONING + msgId, + new AguiEvent.ReasoningMessageEnd( + ctx.getThreadId(), ctx.getRunId(), msgId)); ctx.addActiveReasoning(msgId); } - ctx.emit(new AguiEvent.ReasoningMessageContent(ctx.getThreadId(), ctx.getRunId(), msgId, thinking)); + ctx.emit( + new AguiEvent.ReasoningMessageContent( + ctx.getThreadId(), ctx.getRunId(), msgId, thinking)); } if (event.isLast() && ctx.isReasoningActive(msgId)) { diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java index 627653914..221aa1497 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java @@ -22,7 +22,6 @@ import io.agentscope.core.message.ContentBlock; import io.agentscope.core.message.TextBlock; import io.agentscope.core.message.ToolResultBlock; - import java.util.UUID; /** @@ -37,8 +36,12 @@ public boolean isApplicable(Event event) { @Override public void convert(ToolResultBlock block, Event event, StreamContext ctx) { - String toolCallId = block.getId() != null ? block.getId() : - (ctx.getLastActiveToolId() != null ? ctx.getLastActiveToolId() : UUID.randomUUID().toString()); + String toolCallId = + block.getId() != null + ? block.getId() + : (ctx.getLastActiveToolId() != null + ? ctx.getLastActiveToolId() + : UUID.randomUUID().toString()); String result = extractToolResultText(block); // Closing Start/End Phase @@ -47,19 +50,24 @@ public void convert(ToolResultBlock block, Event event, StreamContext ctx) { } else { // Fall-back: The previous process did not proceed to Start for some reason // (e.g., recovery directly from the context) - String toolName = block.getName() != null && !block.getName().isBlank() ? block.getName() : "unknown"; - ctx.emit(new AguiEvent.ToolCallStart(ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); + String toolName = + block.getName() != null && !block.getName().isBlank() + ? block.getName() + : "unknown"; + ctx.emit( + new AguiEvent.ToolCallStart( + ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); ctx.emit(new AguiEvent.ToolCallEnd(ctx.getThreadId(), ctx.getRunId(), toolCallId)); } - ctx.emit(new AguiEvent.ToolCallResult( - ctx.getThreadId(), - ctx.getRunId(), - toolCallId, - result != null ? result : "", - "tool", - event.getMessage().getId() - )); + ctx.emit( + new AguiEvent.ToolCallResult( + ctx.getThreadId(), + ctx.getRunId(), + toolCallId, + result != null ? result : "", + "tool", + event.getMessage().getId())); if (ctx.isToolActive(toolCallId)) { ctx.removeActiveTool(toolCallId); @@ -89,4 +97,4 @@ private String extractToolResultText(ToolResultBlock toolResult) { return !sb.isEmpty() ? sb.toString() : null; } -} \ No newline at end of file +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java index 76a024989..0407106d0 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java @@ -20,7 +20,6 @@ import io.agentscope.core.agui.adapter.StreamContext; import io.agentscope.core.agui.event.AguiEvent; import io.agentscope.core.message.ToolUseBlock; - import java.util.UUID; /** @@ -35,8 +34,12 @@ public boolean isApplicable(Event event) { @Override public void convert(ToolUseBlock block, Event event, StreamContext ctx) { - String toolCallId = block.getId() != null ? block.getId() : - (ctx.getLastActiveToolId() != null ? ctx.getLastActiveToolId() : UUID.randomUUID().toString()); + String toolCallId = + block.getId() != null + ? block.getId() + : (ctx.getLastActiveToolId() != null + ? ctx.getLastActiveToolId() + : UUID.randomUUID().toString()); if (!ctx.isToolActive(toolCallId)) { // End any active Text/Reasoning message before starting tool call @@ -44,8 +47,11 @@ public void convert(ToolUseBlock block, Event event, StreamContext ctx) { ctx.flushAllActiveReasonings(); String toolName = block.getName() != null ? block.getName() : "unknown"; - ctx.emit(new AguiEvent.ToolCallStart(ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); - ctx.deferEndEvent(StreamContext.PREFIX_TOOL + toolCallId, + ctx.emit( + new AguiEvent.ToolCallStart( + ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); + ctx.deferEndEvent( + StreamContext.PREFIX_TOOL + toolCallId, new AguiEvent.ToolCallEnd(ctx.getThreadId(), ctx.getRunId(), toolCallId)); ctx.addActiveTool(toolCallId); } @@ -54,8 +60,10 @@ public void convert(ToolUseBlock block, Event event, StreamContext ctx) { if (ctx.getConfig().isEmitToolCallArgs() && !event.isLast()) { String args = block.getContent(); if (args != null && !args.isEmpty()) { - ctx.emit(new AguiEvent.ToolCallArgs(ctx.getThreadId(), ctx.getRunId(), toolCallId, args)); + ctx.emit( + new AguiEvent.ToolCallArgs( + ctx.getThreadId(), ctx.getRunId(), toolCallId, args)); } } } -} \ No newline at end of file +} diff --git a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java index 0fc708270..79f2ad17e 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java @@ -711,7 +711,11 @@ void testTextMessageLifecycleWhenInterruptedByToolCall() { return msgId.equals(end.messageId()); }) .count(); - assertEquals(2, textEndCount, "Should emit exactly 2 TextMessageEnds: one for the interrupt, one for the final chunk"); + assertEquals( + 2, + textEndCount, + "Should emit exactly 2 TextMessageEnds: one for the interrupt, one for the final" + + " chunk"); // Optional: You can also verify that it was started twice for the same reason long textStartCount = @@ -719,11 +723,15 @@ void testTextMessageLifecycleWhenInterruptedByToolCall() { .filter(e -> e instanceof AguiEvent.TextMessageStart) .filter( e -> { - AguiEvent.TextMessageStart start = (AguiEvent.TextMessageStart) e; + AguiEvent.TextMessageStart start = + (AguiEvent.TextMessageStart) e; return msgId.equals(start.messageId()); }) .count(); - assertEquals(2, textStartCount, "Should emit exactly 2 TextMessageStarts due to the tool call interruption"); + assertEquals( + 2, + textStartCount, + "Should emit exactly 2 TextMessageStarts due to the tool call interruption"); } @Test From cce154d1faed47f5d4629d85a89c3516a3127a56 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Wed, 6 May 2026 13:47:38 +0800 Subject: [PATCH 3/6] test: add ag-ui CUSTOM event testcase --- .../agentscope/core/message/CustomBlock.java | 8 +- .../core/agui/adapter/StreamContext.java | 15 +++ .../adapter/strategy/TextBlockConverter.java | 2 +- .../strategy/ThinkingBlockConverter.java | 2 +- .../strategy/ToolUseBlockConverter.java | 7 +- .../agui/adapter/AguiAgentAdapterTest.java | 93 +++++++++++++++++++ 6 files changed, 119 insertions(+), 8 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java index 6dfc5ccf8..515a360ae 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java +++ b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java @@ -36,12 +36,12 @@ public final class CustomBlock extends ContentBlock { /** * Creates a new custom block for JSON deserialization. * - * @param name The name of the custom event + * @param name The name of the custom event (null will be converted to empty string) * @param value The arbitrary payload value associated with the event */ @JsonCreator public CustomBlock(@JsonProperty("name") String name, @JsonProperty("value") Object value) { - this.name = name; + this.name = name != null ? name : ""; this.value = value; } @@ -105,10 +105,10 @@ public Builder value(Object value) { /** * Builds a new CustomBlock with the configured properties. * - * @return A new CustomBlock instance + * @return A new CustomBlock instance (null name will be converted to empty string) */ public CustomBlock build() { - return new CustomBlock(name, value); + return new CustomBlock(name != null ? name : "", value); } } } diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java index 482586a5c..629b85abe 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java @@ -66,14 +66,29 @@ public StreamContext(String threadId, String runId, AguiAdapterConfig config) { this.config = config; } + /** + * Returns the thread identifier associated with this stream context. + * + * @return the thread ID + */ public String getThreadId() { return threadId; } + /** + * Returns the run identifier associated with this stream context. + * + * @return the run ID + */ public String getRunId() { return runId; } + /** + * Returns the adapter configuration used by this stream context. + * + * @return the adapter configuration + */ public AguiAdapterConfig getConfig() { return config; } diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java index 707803de5..41278f431 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java @@ -36,7 +36,7 @@ public void convert(TextBlock block, Event event, StreamContext ctx) { String text = block.getText(); String msgId = event.getMessage().getId(); - if (text != null && !text.isEmpty()) { + if (text != null && !text.isBlank()) { if (!ctx.isTextActive(msgId)) { ctx.flushAllActiveTexts(); diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java index 6fa61b9d7..6ea32d561 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java @@ -41,7 +41,7 @@ public void convert(ThinkingBlock block, Event event, StreamContext ctx) { String thinking = block.getThinking(); String msgId = event.getMessage().getId(); - if (thinking != null && !thinking.isEmpty()) { + if (thinking != null && !thinking.isBlank()) { if (!ctx.isReasoningActive(msgId)) { ctx.flushAllActiveReasonings(); diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java index 0407106d0..4c917fab0 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java @@ -46,7 +46,10 @@ public void convert(ToolUseBlock block, Event event, StreamContext ctx) { ctx.flushAllActiveTexts(); ctx.flushAllActiveReasonings(); - String toolName = block.getName() != null ? block.getName() : "unknown"; + String toolName = + block.getName() != null && !block.getName().isBlank() + ? block.getName() + : "unknown"; ctx.emit( new AguiEvent.ToolCallStart( ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName)); @@ -59,7 +62,7 @@ public void convert(ToolUseBlock block, Event event, StreamContext ctx) { // Emit tool call args if enabled if (ctx.getConfig().isEmitToolCallArgs() && !event.isLast()) { String args = block.getContent(); - if (args != null && !args.isEmpty()) { + if (args != null && !args.isBlank()) { ctx.emit( new AguiEvent.ToolCallArgs( ctx.getThreadId(), ctx.getRunId(), toolCallId, args)); diff --git a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java index 79f2ad17e..b3dcae766 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java @@ -31,6 +31,7 @@ import io.agentscope.core.agui.event.AguiEvent; import io.agentscope.core.agui.model.AguiMessage; import io.agentscope.core.agui.model.RunAgentInput; +import io.agentscope.core.message.CustomBlock; import io.agentscope.core.message.Msg; import io.agentscope.core.message.MsgRole; import io.agentscope.core.message.TextBlock; @@ -1718,4 +1719,96 @@ void testRunWithNullThinkingBlock() { !hasReasoningMessageStart, "Should NOT have ReasoningMessageStart for null thinking"); } + + @Test + void testRunWithCustomEventForToolProgress() { + // Simulate intermediate tool progress using CustomBlock (50% downloaded) + Msg progressMsg1 = + Msg.builder() + .id("msg-tr1") + .role(MsgRole.TOOL) + .content( + CustomBlock.builder() + .name("tool_progress") + .value(Map.of("progress", "50%")) + .build()) + .build(); + Event progressEvent1 = new Event(EventType.TOOL_RESULT, progressMsg1, false); + + // Simulate intermediate tool progress (100% downloaded) + Msg progressMsg2 = + Msg.builder() + .id("msg-tr1") + .role(MsgRole.TOOL) + .content( + CustomBlock.builder() + .name("tool_progress") + .value(Map.of("progress", "100%")) + .build()) + .build(); + Event progressEvent2 = new Event(EventType.TOOL_RESULT, progressMsg2, false); + + // Simulate final tool result (isLast = true) + Msg finalResultMsg = + Msg.builder() + .id("msg-tr1") + .role(MsgRole.TOOL) + .content( + ToolResultBlock.builder() + .id("tc-1") + .output( + TextBlock.builder() + .text("Download complete") + .build()) + .build()) + .build(); + Event finalResultEvent = new Event(EventType.TOOL_RESULT, finalResultMsg, true); + + when(mockAgent.stream(anyList(), any(StreamOptions.class))) + .thenReturn(Flux.just(progressEvent1, progressEvent2, finalResultEvent)); + + RunAgentInput input = + RunAgentInput.builder() + .threadId("thread-1") + .runId("run-1") + .messages(List.of(AguiMessage.userMessage("msg-1", "Download file"))) + .build(); + + List events = adapter.run(input).collectList().block(); + + assertNotNull(events); + + // Verify that exactly 2 Custom events were emitted for the progress + long customEventCount = events.stream().filter(e -> e instanceof AguiEvent.Custom).count(); + assertEquals(2, customEventCount, "Should have 2 Custom events for tool progress"); + + // Verify the content of the Custom events + List customEvents = + events.stream() + .filter(e -> e instanceof AguiEvent.Custom) + .map(e -> (AguiEvent.Custom) e) + .toList(); + + assertEquals("tool_progress", customEvents.get(0).name()); + Map value1 = (Map) customEvents.get(0).value(); + assertEquals("50%", value1.get("progress")); + + assertEquals("tool_progress", customEvents.get(1).name()); + Map value2 = (Map) customEvents.get(1).value(); + assertEquals("100%", value2.get("progress")); + + // Verify that the final ToolCallResult was emitted correctly + long toolResultCount = + events.stream().filter(e -> e instanceof AguiEvent.ToolCallResult).count(); + assertEquals( + 1, toolResultCount, "Should have exactly 1 ToolCallResult for the final event"); + + AguiEvent.ToolCallResult finalResult = + (AguiEvent.ToolCallResult) + events.stream() + .filter(e -> e instanceof AguiEvent.ToolCallResult) + .findFirst() + .orElseThrow(); + assertEquals("Download complete", finalResult.content()); + } } From e9d8cb9ab47c3e8f56c21bb44d58a428d771c0da Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Wed, 13 May 2026 13:30:11 +0800 Subject: [PATCH 4/6] fix: duplicate text/reasoning content --- .../core/agui/adapter/strategy/TextBlockConverter.java | 8 +++++--- .../agui/adapter/strategy/ThinkingBlockConverter.java | 8 +++++--- .../core/agui/adapter/AguiAgentAdapterTest.java | 7 +++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java index 41278f431..d13607ab3 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java @@ -49,9 +49,11 @@ public void convert(TextBlock block, Event event, StreamContext ctx) { ctx.addActiveText(msgId); } - ctx.emit( - new AguiEvent.TextMessageContent( - ctx.getThreadId(), ctx.getRunId(), msgId, text)); + if (!event.isLast()) { + ctx.emit( + new AguiEvent.TextMessageContent( + ctx.getThreadId(), ctx.getRunId(), msgId, text)); + } } if (event.isLast() && ctx.isTextActive(msgId)) { diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java index 6ea32d561..6b07ada66 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java @@ -55,9 +55,11 @@ public void convert(ThinkingBlock block, Event event, StreamContext ctx) { ctx.addActiveReasoning(msgId); } - ctx.emit( - new AguiEvent.ReasoningMessageContent( - ctx.getThreadId(), ctx.getRunId(), msgId, thinking)); + if (!event.isLast()) { + ctx.emit( + new AguiEvent.ReasoningMessageContent( + ctx.getThreadId(), ctx.getRunId(), msgId, thinking)); + } } if (event.isLast() && ctx.isReasoningActive(msgId)) { diff --git a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java index b3dcae766..1bafd9efc 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java @@ -270,7 +270,7 @@ void testRunWithStreamingSummaryEvents() { long contentCount = events.stream().filter(e -> e instanceof AguiEvent.TextMessageContent).count(); - assertEquals(3, contentCount, "Should stream summary chunks as text deltas"); + assertEquals(2, contentCount, "Should stream summary chunks as text deltas"); long startCount = events.stream().filter(e -> e instanceof AguiEvent.TextMessageStart).count(); @@ -821,9 +821,8 @@ void testTextMessageEndWithLastEventDirectly() { // Verify the event sequence assertInstanceOf(AguiEvent.RunStarted.class, events.get(0)); assertInstanceOf(AguiEvent.TextMessageStart.class, events.get(1)); - assertInstanceOf(AguiEvent.TextMessageContent.class, events.get(2)); - assertInstanceOf(AguiEvent.TextMessageEnd.class, events.get(3)); - assertInstanceOf(AguiEvent.RunFinished.class, events.get(4)); + assertInstanceOf(AguiEvent.TextMessageEnd.class, events.get(2)); + assertInstanceOf(AguiEvent.RunFinished.class, events.get(3)); } @Test From b6605951b840abeea8af4e76cbfce5dd87a889e8 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Wed, 13 May 2026 14:50:50 +0800 Subject: [PATCH 5/6] fix: duplicate reasoning start/end agui event --- .../core/agui/adapter/StreamContext.java | 20 +++++++++ .../adapter/strategy/TextBlockConverter.java | 5 ++- .../strategy/ThinkingBlockConverter.java | 5 ++- .../agui/adapter/AguiAgentAdapterTest.java | 43 +++++++++---------- 4 files changed, 49 insertions(+), 24 deletions(-) diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java index 629b85abe..51f697aae 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java @@ -17,6 +17,7 @@ import io.agentscope.core.agui.event.AguiEvent; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -50,6 +51,9 @@ public class StreamContext { private final Set activeReasoningIds = new LinkedHashSet<>(); private final Set activeToolIds = new LinkedHashSet<>(); + private final Set finishedTextIds = new HashSet<>(); + private final Set finishedReasoningIds = new HashSet<>(); + // Fallback ID for tool results that might lack an explicit ID private String lastActiveToolId = null; @@ -148,10 +152,16 @@ public List flushAllRemainingDeferred() { List remaining = new ArrayList<>(deferredEndEvents.values()); deferredEndEvents.clear(); + activeTextIds.clear(); activeReasoningIds.clear(); activeToolIds.clear(); + finishedTextIds.clear(); + finishedReasoningIds.clear(); + + lastActiveToolId = null; + return remaining; } @@ -189,6 +199,11 @@ public void addActiveText(String id) { public void removeActiveText(String id) { activeTextIds.remove(id); + finishedTextIds.add(id); + } + + public boolean isTextFinished(String id) { + return finishedTextIds.contains(id); } // --- Reasoning State Management --- @@ -203,6 +218,11 @@ public void addActiveReasoning(String id) { public void removeActiveReasoning(String id) { activeReasoningIds.remove(id); + finishedReasoningIds.add(id); + } + + public boolean isReasoningFinished(String id) { + return finishedReasoningIds.contains(id); } // --- Tool State Management --- diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java index d13607ab3..37f0bfeb1 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java @@ -33,9 +33,12 @@ public boolean isApplicable(Event event) { @Override public void convert(TextBlock block, Event event, StreamContext ctx) { - String text = block.getText(); String msgId = event.getMessage().getId(); + if (ctx.isTextFinished(msgId)) { + return; + } + String text = block.getText(); if (text != null && !text.isBlank()) { if (!ctx.isTextActive(msgId)) { ctx.flushAllActiveTexts(); diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java index 6b07ada66..515219578 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java @@ -38,9 +38,12 @@ public void convert(ThinkingBlock block, Event event, StreamContext ctx) { return; } - String thinking = block.getThinking(); String msgId = event.getMessage().getId(); + if (ctx.isReasoningFinished(msgId)) { + return; + } + String thinking = block.getThinking(); if (thinking != null && !thinking.isBlank()) { if (!ctx.isReasoningActive(msgId)) { ctx.flushAllActiveReasonings(); diff --git a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java index 1bafd9efc..631fbd589 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java @@ -655,10 +655,10 @@ void testToolUseBlockWithNullId() { } @Test - void testTextMessageLifecycleWhenInterruptedByToolCall() { - // Test that when a text message is interrupted by a tool call, the active text message - // is closed immediately. When subsequent text blocks with the same message ID arrive, - // a new text message lifecycle (Start -> Content -> End) is initiated. + void testTextMessageIgnoredAfterToolCallInterruption() { + // Test that when a text message is interrupted by a tool call, + // the active text message is closed immediately and marked as finished. + // Subsequent text blocks with the same message ID should be ignored. String msgId = "msg-text"; Msg firstMsg = Msg.builder() @@ -688,6 +688,7 @@ void testTextMessageLifecycleWhenInterruptedByToolCall() { Event firstEvent = new Event(EventType.REASONING, firstMsg, false); Event toolCallEvent = new Event(EventType.REASONING, toolCall1, false); Event lastEvent = new Event(EventType.REASONING, lastMsg, true); + when(mockAgent.stream(anyList(), any(StreamOptions.class))) .thenReturn(Flux.just(firstEvent, toolCallEvent, lastEvent)); @@ -702,37 +703,35 @@ void testTextMessageLifecycleWhenInterruptedByToolCall() { assertNotNull(events); - // Verify that TextMessageEnd is emitted twice due to the tool call interruption - long textEndCount = + long textStartCount = events.stream() - .filter(e -> e instanceof AguiEvent.TextMessageEnd) + .filter(e -> e instanceof AguiEvent.TextMessageStart) .filter( e -> { - AguiEvent.TextMessageEnd end = (AguiEvent.TextMessageEnd) e; - return msgId.equals(end.messageId()); + AguiEvent.TextMessageStart start = + (AguiEvent.TextMessageStart) e; + return msgId.equals(start.messageId()); }) .count(); assertEquals( - 2, - textEndCount, - "Should emit exactly 2 TextMessageEnds: one for the interrupt, one for the final" - + " chunk"); + 1, + textStartCount, + "Should emit exactly 1 TextMessageStart. The subsequent block should be ignored"); - // Optional: You can also verify that it was started twice for the same reason - long textStartCount = + long textEndCount = events.stream() - .filter(e -> e instanceof AguiEvent.TextMessageStart) + .filter(e -> e instanceof AguiEvent.TextMessageEnd) .filter( e -> { - AguiEvent.TextMessageStart start = - (AguiEvent.TextMessageStart) e; - return msgId.equals(start.messageId()); + AguiEvent.TextMessageEnd end = (AguiEvent.TextMessageEnd) e; + return msgId.equals(end.messageId()); }) .count(); assertEquals( - 2, - textStartCount, - "Should emit exactly 2 TextMessageStarts due to the tool call interruption"); + 1, + textEndCount, + "Should emit exactly 1 TextMessageEnd: the one triggered by the tool" + + " interruption."); } @Test From 68d83f1ef6842f3681ae2a98414be8226c9695a4 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Wed, 13 May 2026 21:02:53 +0800 Subject: [PATCH 6/6] fix: force end the reasoning event before start the text event --- .../core/agui/adapter/strategy/TextBlockConverter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java index 37f0bfeb1..c3546fa0f 100644 --- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java +++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java @@ -42,6 +42,8 @@ public void convert(TextBlock block, Event event, StreamContext ctx) { if (text != null && !text.isBlank()) { if (!ctx.isTextActive(msgId)) { ctx.flushAllActiveTexts(); + // When the text is output, it signifies that the "reasoning" has end + ctx.flushAllActiveReasonings(); ctx.emit( new AguiEvent.TextMessageStart(