diff --git a/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java b/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java
index 55afe0393..d9b9ac10a 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/message/ContentBlock.java
@@ -35,6 +35,7 @@
*
{@link CustomBlock} - Custom extension block to trigger AG-UI Custom events
*
*
* Uses Jackson annotations for polymorphic JSON serialization with the "type" discriminator
@@ -49,7 +50,8 @@
@JsonSubTypes.Type(value = AudioBlock.class, name = "audio"),
@JsonSubTypes.Type(value = VideoBlock.class, name = "video"),
@JsonSubTypes.Type(value = ToolUseBlock.class, name = "tool_use"),
- @JsonSubTypes.Type(value = ToolResultBlock.class, name = "tool_result")
+ @JsonSubTypes.Type(value = ToolResultBlock.class, name = "tool_result"),
+ @JsonSubTypes.Type(value = CustomBlock.class, name = "custom")
})
public sealed class ContentBlock implements State
permits TextBlock,
@@ -58,4 +60,5 @@ public sealed class ContentBlock implements State
VideoBlock,
ThinkingBlock,
ToolUseBlock,
- ToolResultBlock {}
+ ToolResultBlock,
+ CustomBlock {}
diff --git a/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java
new file mode 100644
index 000000000..515a360ae
--- /dev/null
+++ b/agentscope-core/src/main/java/io/agentscope/core/message/CustomBlock.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.message;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents a custom extension block specifically designed to trigger AG-UI Custom events.
+ *
+ *
This block allows for the transmission of arbitrary JSON-serializable payloads
+ * to the frontend. It is commonly used for real-time progress pushes, workflow
+ * state changes, and any other customized interactive behaviors within the AG-UI protocol.
+ *
+ *
The name identifies the custom event type, and the value can be a Map, String,
+ * Number, or any JSON-serializable object containing the event payload.
+ */
+public final class CustomBlock extends ContentBlock {
+
+ private final String name;
+ private final Object value;
+
+ /**
+ * Creates a new custom block for JSON deserialization.
+ *
+ * @param name The name of the custom event (null will be converted to empty string)
+ * @param value The arbitrary payload value associated with the event
+ */
+ @JsonCreator
+ public CustomBlock(@JsonProperty("name") String name, @JsonProperty("value") Object value) {
+ this.name = name != null ? name : "";
+ this.value = value;
+ }
+
+ /**
+ * Gets the name of this custom block.
+ *
+ * @return The event name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets the payload value of this custom block.
+ *
+ * @return The payload object
+ */
+ public Object getValue() {
+ return value;
+ }
+
+ /**
+ * Creates a new builder for constructing CustomBlock instances.
+ *
+ * @return A new builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for constructing CustomBlock instances.
+ */
+ public static class Builder {
+
+ private String name;
+ private Object value;
+
+ /**
+ * Sets the name for the custom block event.
+ *
+ * @param name The custom event name
+ * @return This builder for chaining
+ */
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Sets the arbitrary payload value for the block.
+ *
+ * @param value payload object
+ * @return This builder for chaining
+ */
+ public Builder value(Object value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Builds a new CustomBlock with the configured properties.
+ *
+ * @return A new CustomBlock instance (null name will be converted to empty string)
+ */
+ public CustomBlock build() {
+ return new CustomBlock(name != null ? name : "", value);
+ }
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java
index 2314e72b9..b7d18f588 100644
--- a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/AguiAgentAdapter.java
@@ -19,24 +19,27 @@
import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.EventType;
import io.agentscope.core.agent.StreamOptions;
+import io.agentscope.core.agui.adapter.strategy.BlockEventConverter;
+import io.agentscope.core.agui.adapter.strategy.CustomBlockConverter;
+import io.agentscope.core.agui.adapter.strategy.TextBlockConverter;
+import io.agentscope.core.agui.adapter.strategy.ThinkingBlockConverter;
+import io.agentscope.core.agui.adapter.strategy.ToolResultBlockConverter;
+import io.agentscope.core.agui.adapter.strategy.ToolUseBlockConverter;
import io.agentscope.core.agui.converter.AguiMessageConverter;
import io.agentscope.core.agui.event.AguiEvent;
import io.agentscope.core.agui.model.RunAgentInput;
import io.agentscope.core.message.ContentBlock;
+import io.agentscope.core.message.CustomBlock;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.message.ThinkingBlock;
import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.message.ToolUseBlock;
-import io.agentscope.core.util.JsonException;
-import io.agentscope.core.util.JsonUtils;
import java.util.ArrayList;
-import java.util.LinkedHashSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
import reactor.core.publisher.Flux;
/**
@@ -48,15 +51,15 @@
*
Event Mapping:
*
* - AgentScope REASONING/SUMMARY events → AG-UI TEXT_MESSAGE_* events (for TextBlock)
- * - AgentScope REASONING/SUMMARY events → AG-UI REASONING_* events (for
- * ThinkingBlock, when enabled)
+ * - AgentScope REASONING/SUMMARY events → AG-UI REASONING_* events (for ThinkingBlock)
* - AgentScope TOOL_RESULT events → AG-UI TOOL_CALL_END events
* - ToolUseBlock content → AG-UI TOOL_CALL_START events
+ * - CustomBlock content → AG-UI CUSTOM events
*
*
* Reasoning Support:
*
- * - ThinkingBlock content is converted to REASONING_* events according to AG-UI Reasoning draft
+ * - ThinkingBlock content is converted to REASONING_* events
* - Reasoning output is disabled by default (enableReasoning=false) for backward compatibility
* - Set enableReasoning=true in AguiAdapterConfig to enable reasoning events
*
@@ -66,17 +69,25 @@ public class AguiAgentAdapter {
private final Agent agent;
private final AguiAdapterConfig config;
private final AguiMessageConverter messageConverter;
+ private final Map, BlockEventConverter>> converters = new HashMap<>();
/**
- * Creates a new AguiAgentAdapter.
+ * Creates a new AguiAgentAdapter and registers all block conversion strategies.
*
- * @param agent The agent to adapt
+ * @param agent The agent to adapt
* @param config The adapter configuration
*/
public AguiAgentAdapter(Agent agent, AguiAdapterConfig config) {
- this.agent = Objects.requireNonNull(agent, "agent cannot be null");
- this.config = Objects.requireNonNull(config, "config cannot be null");
+ this.agent = Objects.requireNonNull(agent);
+ this.config = Objects.requireNonNull(config);
this.messageConverter = new AguiMessageConverter();
+
+ // Register block conversion strategies
+ converters.put(TextBlock.class, new TextBlockConverter());
+ converters.put(ThinkingBlock.class, new ThinkingBlockConverter());
+ converters.put(ToolUseBlock.class, new ToolUseBlockConverter());
+ converters.put(ToolResultBlock.class, new ToolResultBlockConverter());
+ converters.put(CustomBlock.class, new CustomBlockConverter());
}
/**
@@ -89,399 +100,80 @@ public AguiAgentAdapter(Agent agent, AguiAdapterConfig config) {
* @return A Flux of AG-UI events
*/
public Flux run(RunAgentInput input) {
+ String threadId = input.getThreadId();
+ String runId = input.getRunId();
+ List msgs = messageConverter.toMsgList(input.getMessages());
+ // Create stream options - use incremental mode for true streaming
+ StreamOptions options =
+ StreamOptions.builder().eventTypes(EventType.ALL).incremental(true).build();
+
return Flux.defer(
() -> {
- String threadId = input.getThreadId();
- String runId = input.getRunId();
-
- // Convert AG-UI messages to AgentScope messages
- List msgs = messageConverter.toMsgList(input.getMessages());
-
- // Create stream options - use incremental mode for true streaming
- StreamOptions options =
- StreamOptions.builder()
- .eventTypes(EventType.ALL)
- .incremental(true)
- .build();
-
- // Track state for event conversion
- EventConversionState state = new EventConversionState(threadId, runId);
+ StreamContext ctx = new StreamContext(threadId, runId, config);
return Flux.concat(
// Emit RUN_STARTED
Flux.just(new AguiEvent.RunStarted(threadId, runId)),
+
// Stream agent events and convert to AG-UI events
// Use concatMapIterable to preserve strict event ordering
agent.stream(msgs, options)
- .concatMapIterable(event -> convertEvent(event, state)),
- // Emit any pending end events and RUN_FINISHED
- Flux.defer(() -> finishRun(state)))
- .onErrorResume(
- error -> {
- // On error, emit RawEvent with error info followed by
- // RunFinished
- String errorMessage =
- error.getMessage() != null
- ? error.getMessage()
- : error.getClass().getSimpleName();
- return Flux.just(
- new AguiEvent.Raw(
- threadId,
- runId,
- Map.of("error", errorMessage)),
- new AguiEvent.RunFinished(threadId, runId));
- });
+ .concatMapIterable(event -> processEvent(event, ctx)),
+
+ // Emit any pending end events
+ Flux.defer(
+ () ->
+ Flux.fromIterable(
+ ctx.flushAllRemainingDeferred())),
+
+ // Emit RUN_FINISHED
+ Flux.just(new AguiEvent.RunFinished(threadId, runId)))
+ .onErrorResume(error -> handleError(threadId, runId, ctx, error));
});
}
/**
- * Convert an AgentScope event to AG-UI events.
+ * Dispatches the incoming event to the appropriate converter strategies based on block types.
*
- * @param event The AgentScope event
- * @param state The conversion state
- * @return List of AG-UI events
+ * @param event The incoming agent event
+ * @param ctx The current stream context
+ * @return A list of AG-UI events generated during this processing cycle
*/
- private List convertEvent(Event event, EventConversionState state) {
- List events = new ArrayList<>();
- Msg msg = event.getMessage();
- EventType type = event.getType();
-
- if (type == EventType.REASONING || type == EventType.SUMMARY) {
- // Handle reasoning/summary events - convert to text messages and tool calls
- for (ContentBlock block : msg.getContent()) {
- if (block instanceof TextBlock textBlock) {
- String text = textBlock.getText();
- if (text != null && !text.isEmpty()) {
- String messageId = msg.getId();
-
- // Start message if not started
- if (!state.hasStartedMessage(messageId)) {
- events.add(
- new AguiEvent.TextMessageStart(
- state.threadId, state.runId, messageId, "assistant"));
- state.startMessage(messageId);
- }
-
- if (!event.isLast()) {
- // In incremental mode, text is already the delta
- events.add(
- new AguiEvent.TextMessageContent(
- state.threadId, state.runId, messageId, text));
- } else {
- // End message if this is the last event
- if (!state.hasEndedMessage(messageId)) {
- events.add(
- new AguiEvent.TextMessageEnd(
- state.threadId, state.runId, messageId));
- state.endMessage(messageId);
- }
- }
- }
- } else if (block instanceof ThinkingBlock thinkingBlock) {
- // Handle thinking blocks - convert to REASONING_* events (only if enabled)
- // According to AG-UI Reasoning draft: https://docs.ag-ui.com/drafts/reasoning
- if (config.isEnableReasoning()) {
- String thinking = thinkingBlock.getThinking();
- if (thinking != null && !thinking.isEmpty()) {
- String messageId = msg.getId();
-
- // Start reasoning message if not started
- if (!state.hasStartedReasoningMessage(messageId)) {
- events.add(
- new AguiEvent.ReasoningMessageStart(
- state.threadId,
- state.runId,
- messageId,
- "reasoning"));
- state.startReasoningMessage(messageId);
- }
-
- if (!event.isLast()) {
- // In incremental mode, thinking is already the delta
- events.add(
- new AguiEvent.ReasoningMessageContent(
- state.threadId, state.runId, messageId, thinking));
- } else {
- // End reasoning message if this is the last event
- events.add(
- new AguiEvent.ReasoningMessageEnd(
- state.threadId, state.runId, messageId));
- state.endReasoningMessage(messageId);
- }
- }
- }
- // If reasoning is disabled, ThinkingBlock content is ignored (backward
- // compatibility)
- } else if (block instanceof ToolUseBlock toolUse) {
- // End any active text message before starting tool call
- if (state.hasActiveTextMessage()) {
- String activeMessageId = state.getCurrentTextMessageId();
- events.add(
- new AguiEvent.TextMessageEnd(
- state.threadId, state.runId, activeMessageId));
- state.endMessage(activeMessageId);
- }
-
- // End any active reasoning message before starting tool call
- if (state.hasActiveReasoningMessage()) {
- String activeReasoningMessageId = state.getCurrentReasoningMessageId();
- events.add(
- new AguiEvent.ReasoningMessageEnd(
- state.threadId, state.runId, activeReasoningMessageId));
- state.endReasoningMessage(activeReasoningMessageId);
- }
-
- // Emit tool call start
- String toolCallId = toolUse.getId();
- if (toolCallId == null) {
- toolCallId = UUID.randomUUID().toString();
- }
-
- if (!state.hasStartedToolCall(toolCallId)) {
- events.add(
- new AguiEvent.ToolCallStart(
- state.threadId,
- state.runId,
- toolCallId,
- toolUse.getName()));
- state.startToolCall(toolCallId);
- }
-
- // Emit tool call args if enabled
- if (config.isEmitToolCallArgs() && !event.isLast()) {
- String args = toolUse.getContent();
- if (args != null && !args.isEmpty()) {
- events.add(
- new AguiEvent.ToolCallArgs(
- state.threadId, state.runId, toolCallId, args));
- }
- }
- }
- }
- } else if (type == EventType.TOOL_RESULT && event.isLast()) {
- // Handle tool results
- for (ContentBlock block : msg.getContent()) {
- if (block instanceof ToolResultBlock toolResult) {
- String toolCallId = toolResult.getId();
- if (toolCallId == null) {
- toolCallId = UUID.randomUUID().toString();
- }
-
- String result = extractToolResultText(toolResult);
-
- boolean hasStarted = state.hasStartedToolCall(toolCallId);
- if (!hasStarted) {
- String toolName = toolResult.getName();
- if (toolName == null || toolName.isBlank()) {
- toolName = "unknown";
- }
- events.add(
- new AguiEvent.ToolCallStart(
- state.threadId, state.runId, toolCallId, toolName));
- state.startToolCall(toolCallId);
- }
-
- // Ensure ToolCallEnd is emitted to close arguments phase
- events.add(new AguiEvent.ToolCallEnd(state.threadId, state.runId, toolCallId));
-
- events.add(
- new AguiEvent.ToolCallResult(
- state.threadId,
- state.runId,
- toolCallId,
- result,
- "tool",
- msg.getId()));
- state.endToolCall(toolCallId);
- }
+ @SuppressWarnings("unchecked")
+ private List processEvent(Event event, StreamContext ctx) {
+ // Dispatch each content block to its corresponding converter
+ for (ContentBlock block : event.getMessage().getContent()) {
+ BlockEventConverter converter =
+ (BlockEventConverter) converters.get(block.getClass());
+
+ if (converter != null && converter.isApplicable(event)) {
+ converter.convert(block, event, ctx);
}
}
- return events;
+ return ctx.getAndClearEmittedEvents();
}
/**
- * Finish the run by emitting any pending end events and RUN_FINISHED.
+ * Handles errors that occur during the stream pipeline.
+ * Guarantees that all deferred end events are flushed before the error event is emitted.
*
- * @param state The conversion state
- * @return Flux of final events
+ * @param threadId The thread ID
+ * @param runId The run ID
+ * @param ctx The current stream context
+ * @param error The thrown exception
+ * @return A Flux containing the fallback closure events
*/
- private Flux finishRun(EventConversionState state) {
+ private Flux handleError(
+ String threadId, String runId, StreamContext ctx, Throwable error) {
List events = new ArrayList<>();
+ events.addAll(ctx.flushAllRemainingDeferred());
- // End any messages that weren't properly ended
- for (String messageId : state.getStartedMessages()) {
- if (!state.hasEndedMessage(messageId)) {
- events.add(new AguiEvent.TextMessageEnd(state.threadId, state.runId, messageId));
- }
- }
-
- // End any tool calls that weren't properly ended
- for (String toolCallId : state.getStartedToolCalls()) {
- if (!state.hasEndedToolCall(toolCallId)) {
- events.add(new AguiEvent.ToolCallEnd(state.threadId, state.runId, toolCallId));
- }
- }
-
- // End any reasoning messages that weren't properly ended
- for (String messageId : state.getStartedReasoningMessages()) {
- if (!state.hasEndedReasoningMessage(messageId)) {
- events.add(
- new AguiEvent.ReasoningMessageEnd(state.threadId, state.runId, messageId));
- }
- }
-
- // Emit RUN_FINISHED
- events.add(new AguiEvent.RunFinished(state.threadId, state.runId));
+ String msg =
+ error.getMessage() != null ? error.getMessage() : error.getClass().getSimpleName();
+ events.add(new AguiEvent.Raw(threadId, runId, Map.of("error", msg)));
+ events.add(new AguiEvent.RunFinished(threadId, runId));
return Flux.fromIterable(events);
}
-
- /**
- * Extract text content from a tool result block.
- *
- * @param toolResult The tool result block
- * @return The text content, or null if not present
- */
- private String extractToolResultText(ToolResultBlock toolResult) {
- if (toolResult.getOutput() == null || toolResult.getOutput().isEmpty()) {
- return null;
- }
-
- StringBuilder sb = new StringBuilder();
- for (ContentBlock output : toolResult.getOutput()) {
- if (output instanceof TextBlock textBlock) {
- if (!sb.isEmpty()) {
- sb.append("\n");
- }
- sb.append(textBlock.getText());
- }
- }
-
- return !sb.isEmpty() ? sb.toString() : null;
- }
-
- /**
- * Serialize tool arguments to JSON string.
- *
- * @param input The tool input map
- * @return JSON string representation
- */
- private String serializeToolArgs(Map input) {
- if (input == null || input.isEmpty()) {
- return "{}";
- }
- try {
- return JsonUtils.getJsonCodec().toJson(input);
- } catch (JsonException e) {
- return "{}";
- }
- }
-
- /**
- * State tracker for event conversion.
- * Uses LinkedHashSet to preserve insertion order for proper event sequencing.
- */
- private static class EventConversionState {
- final String threadId;
- final String runId;
- private final Set startedMessages = new LinkedHashSet<>();
- private final Set endedMessages = new LinkedHashSet<>();
- private final Set startedToolCalls = new LinkedHashSet<>();
- private final Set endedToolCalls = new LinkedHashSet<>();
- private final Set startedReasoningMessages = new LinkedHashSet<>();
- private final Set endedReasoningMessages = new LinkedHashSet<>();
- private String currentTextMessageId = null;
- private String currentReasoningMessageId = null;
-
- EventConversionState(String threadId, String runId) {
- this.threadId = threadId;
- this.runId = runId;
- }
-
- boolean hasStartedMessage(String messageId) {
- return startedMessages.contains(messageId);
- }
-
- void startMessage(String messageId) {
- startedMessages.add(messageId);
- currentTextMessageId = messageId;
- }
-
- void endMessage(String messageId) {
- endedMessages.add(messageId);
- if (Objects.equals(messageId, currentTextMessageId)) {
- currentTextMessageId = null;
- }
- }
-
- boolean hasEndedMessage(String messageId) {
- return endedMessages.contains(messageId);
- }
-
- String getCurrentTextMessageId() {
- return currentTextMessageId;
- }
-
- boolean hasActiveTextMessage() {
- return currentTextMessageId != null && !hasEndedMessage(currentTextMessageId);
- }
-
- Set getStartedMessages() {
- return startedMessages;
- }
-
- boolean hasStartedToolCall(String toolCallId) {
- return startedToolCalls.contains(toolCallId);
- }
-
- void startToolCall(String toolCallId) {
- startedToolCalls.add(toolCallId);
- }
-
- void endToolCall(String toolCallId) {
- endedToolCalls.add(toolCallId);
- }
-
- boolean hasEndedToolCall(String toolCallId) {
- return endedToolCalls.contains(toolCallId);
- }
-
- Set getStartedToolCalls() {
- return startedToolCalls;
- }
-
- boolean hasStartedReasoningMessage(String messageId) {
- return startedReasoningMessages.contains(messageId);
- }
-
- void startReasoningMessage(String messageId) {
- startedReasoningMessages.add(messageId);
- currentReasoningMessageId = messageId;
- }
-
- void endReasoningMessage(String messageId) {
- endedReasoningMessages.add(messageId);
- if (Objects.equals(messageId, currentReasoningMessageId)) {
- currentReasoningMessageId = null;
- }
- }
-
- boolean hasEndedReasoningMessage(String messageId) {
- return endedReasoningMessages.contains(messageId);
- }
-
- String getCurrentReasoningMessageId() {
- return currentReasoningMessageId;
- }
-
- boolean hasActiveReasoningMessage() {
- return currentReasoningMessageId != null
- && !hasEndedReasoningMessage(currentReasoningMessageId);
- }
-
- Set getStartedReasoningMessages() {
- return startedReasoningMessages;
- }
- }
}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java
new file mode 100644
index 000000000..51f697aae
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/StreamContext.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter;
+
+import io.agentscope.core.agui.event.AguiEvent;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Context holder for the AG-UI stream pipeline.
+ * Manages the emission of immediate events and the queuing of deferred end events
+ * to ensure strict adherence to the AG-UI lifecycle protocol.
+ */
+public class StreamContext {
+ // Prefixes used to prevent key collisions in the deferred events map,
+ // as different blocks (e.g., Text and Reasoning) may share the same message ID.
+ public static final String PREFIX_TEXT = "text:";
+ public static final String PREFIX_REASONING = "reasoning:";
+ public static final String PREFIX_TOOL = "tool:";
+
+ private final String threadId;
+ private final String runId;
+ private final AguiAdapterConfig config;
+
+ // Events ready to be emitted in the current processing cycle
+ private final List emittedEvents = new ArrayList<>();
+
+ // Deferred event queue for storing pending end events (Key: prefix + id)
+ private final Map deferredEndEvents = new LinkedHashMap<>();
+
+ private final Set activeTextIds = new LinkedHashSet<>();
+ private final Set activeReasoningIds = new LinkedHashSet<>();
+ private final Set activeToolIds = new LinkedHashSet<>();
+
+ private final Set finishedTextIds = new HashSet<>();
+ private final Set finishedReasoningIds = new HashSet<>();
+
+ // Fallback ID for tool results that might lack an explicit ID
+ private String lastActiveToolId = null;
+
+ /**
+ * Initializes a new StreamContext.
+ *
+ * @param threadId The thread ID
+ * @param runId The run ID
+ * @param config The adapter configuration
+ */
+ public StreamContext(String threadId, String runId, AguiAdapterConfig config) {
+ this.threadId = threadId;
+ this.runId = runId;
+ this.config = config;
+ }
+
+ /**
+ * Returns the thread identifier associated with this stream context.
+ *
+ * @return the thread ID
+ */
+ public String getThreadId() {
+ return threadId;
+ }
+
+ /**
+ * Returns the run identifier associated with this stream context.
+ *
+ * @return the run ID
+ */
+ public String getRunId() {
+ return runId;
+ }
+
+ /**
+ * Returns the adapter configuration used by this stream context.
+ *
+ * @return the adapter configuration
+ */
+ public AguiAdapterConfig getConfig() {
+ return config;
+ }
+
+ // --- Event Emission and Deferred Management API ---
+
+ /**
+ * Adds an event to the emission queue for the current processing cycle.
+ *
+ * @param event The AG-UI event to emit
+ */
+ public void emit(AguiEvent event) {
+ this.emittedEvents.add(event);
+ }
+
+ /**
+ * Registers an end event to be emitted later.
+ * This decouples the start logic from the termination logic.
+ *
+ * @param id The prefixed identifier for the component
+ * @param endEvent The end event to defer
+ */
+ public void deferEndEvent(String id, AguiEvent endEvent) {
+ this.deferredEndEvents.put(id, endEvent);
+ }
+
+ /**
+ * Retrieves and clears all events accumulated in the current cycle.
+ *
+ * @return A list of events to be dispatched downstream
+ */
+ public List getAndClearEmittedEvents() {
+ List result = new ArrayList<>(emittedEvents);
+ emittedEvents.clear();
+ return result;
+ }
+
+ /**
+ * Flushes a specific deferred end event into the emission queue.
+ *
+ * @param id The prefixed identifier of the event to flush
+ */
+ public void flushEndEvent(String id) {
+ AguiEvent endEvent = deferredEndEvents.remove(id);
+ if (endEvent != null) {
+ emit(endEvent);
+ }
+ }
+
+ /**
+ * Flushes all remaining deferred end events.
+ * Typically called during stream termination or error recovery to ensure all UI components are closed.
+ *
+ * @return A list of all remaining deferred events
+ */
+ public List flushAllRemainingDeferred() {
+ List remaining = new ArrayList<>(deferredEndEvents.values());
+
+ deferredEndEvents.clear();
+
+ activeTextIds.clear();
+ activeReasoningIds.clear();
+ activeToolIds.clear();
+
+ finishedTextIds.clear();
+ finishedReasoningIds.clear();
+
+ lastActiveToolId = null;
+
+ return remaining;
+ }
+
+ /**
+ * Flushes all active text end events.
+ * Commonly used when an interruption occurs (e.g., a tool call starts).
+ */
+ public void flushAllActiveTexts() {
+ for (String id : new ArrayList<>(activeTextIds)) {
+ flushEndEvent(PREFIX_TEXT + id);
+ removeActiveText(id);
+ }
+ }
+
+ /**
+ * Flushes all active reasoning end events.
+ * Commonly used when an interruption occurs (e.g., a tool call starts).
+ */
+ public void flushAllActiveReasonings() {
+ for (String id : new ArrayList<>(activeReasoningIds)) {
+ flushEndEvent(PREFIX_REASONING + id);
+ removeActiveReasoning(id);
+ }
+ }
+
+ // --- Text State Management ---
+
+ public boolean isTextActive(String id) {
+ return activeTextIds.contains(id);
+ }
+
+ public void addActiveText(String id) {
+ activeTextIds.add(id);
+ }
+
+ public void removeActiveText(String id) {
+ activeTextIds.remove(id);
+ finishedTextIds.add(id);
+ }
+
+ public boolean isTextFinished(String id) {
+ return finishedTextIds.contains(id);
+ }
+
+ // --- Reasoning State Management ---
+
+ public boolean isReasoningActive(String id) {
+ return activeReasoningIds.contains(id);
+ }
+
+ public void addActiveReasoning(String id) {
+ activeReasoningIds.add(id);
+ }
+
+ public void removeActiveReasoning(String id) {
+ activeReasoningIds.remove(id);
+ finishedReasoningIds.add(id);
+ }
+
+ public boolean isReasoningFinished(String id) {
+ return finishedReasoningIds.contains(id);
+ }
+
+ // --- Tool State Management ---
+
+ public boolean isToolActive(String id) {
+ return activeToolIds.contains(id);
+ }
+
+ public void addActiveTool(String id) {
+ this.activeToolIds.add(id);
+ this.lastActiveToolId = id; // Update the fallback ID
+ }
+
+ public void removeActiveTool(String id) {
+ this.activeToolIds.remove(id);
+ // If the removed ID matches the last recorded fallback ID, reset or step back the pointer
+ if (Objects.equals(this.lastActiveToolId, id)) {
+ if (activeToolIds.isEmpty()) {
+ this.lastActiveToolId = null;
+ } else {
+ // Retrieve the last inserted element from the LinkedHashSet
+ String[] array = activeToolIds.toArray(new String[0]);
+ this.lastActiveToolId = array[array.length - 1];
+ }
+ }
+ }
+
+ public String getLastActiveToolId() {
+ return this.lastActiveToolId;
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java
new file mode 100644
index 000000000..67ed91d1b
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/BlockEventConverter.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter.strategy;
+
+import io.agentscope.core.agent.Event;
+import io.agentscope.core.agui.adapter.StreamContext;
+import io.agentscope.core.message.ContentBlock;
+
+/**
+ * Strategy interface for converting different types of ContentBlock into AG-UI events.
+ *
+ * @param The specific type of ContentBlock this converter handles.
+ */
+public interface BlockEventConverter {
+
+ /**
+ * Determines whether the current Event is applicable for this converter.
+ *
+ * @param event The agent stream event to evaluate.
+ * @return true if the event can be processed by this converter, false otherwise.
+ */
+ boolean isApplicable(Event event);
+
+ /**
+ * Executes the conversion logic, generating and appending AG-UI events to the stream context.
+ *
+ * @param block The content block to convert.
+ * @param event The original agent event.
+ * @param context The stream context holding state and emitted events.
+ */
+ void convert(T block, Event event, StreamContext context);
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java
new file mode 100644
index 000000000..27e784476
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/CustomBlockConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter.strategy;
+
+import io.agentscope.core.agent.Event;
+import io.agentscope.core.agui.adapter.StreamContext;
+import io.agentscope.core.agui.event.AguiEvent;
+import io.agentscope.core.message.CustomBlock;
+
+/**
+ * Converter for handling CustomBlock events, transforming them into AG-UI Custom events.
+ */
+public class CustomBlockConverter implements BlockEventConverter {
+
+ @Override
+ public boolean isApplicable(Event event) {
+ // Custom events can occur at any stage, so always return true.
+ return true;
+ }
+
+ @Override
+ public void convert(CustomBlock block, Event event, StreamContext ctx) {
+ ctx.emit(
+ new AguiEvent.Custom(
+ ctx.getThreadId(), ctx.getRunId(), block.getName(), block.getValue()));
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java
new file mode 100644
index 000000000..c3546fa0f
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/TextBlockConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter.strategy;
+
+import io.agentscope.core.agent.Event;
+import io.agentscope.core.agent.EventType;
+import io.agentscope.core.agui.adapter.StreamContext;
+import io.agentscope.core.agui.event.AguiEvent;
+import io.agentscope.core.message.TextBlock;
+
+/**
+ * Converter for handling TextBlock events, transforming them into AG-UI TextMessage events.
+ */
+public class TextBlockConverter implements BlockEventConverter {
+
+ @Override
+ public boolean isApplicable(Event event) {
+ return event.getType() == EventType.REASONING || event.getType() == EventType.SUMMARY;
+ }
+
+ @Override
+ public void convert(TextBlock block, Event event, StreamContext ctx) {
+ String msgId = event.getMessage().getId();
+ if (ctx.isTextFinished(msgId)) {
+ return;
+ }
+
+ String text = block.getText();
+ if (text != null && !text.isBlank()) {
+ if (!ctx.isTextActive(msgId)) {
+ ctx.flushAllActiveTexts();
+ // When the text is output, it signifies that the "reasoning" has end
+ ctx.flushAllActiveReasonings();
+
+ ctx.emit(
+ new AguiEvent.TextMessageStart(
+ ctx.getThreadId(), ctx.getRunId(), msgId, "assistant"));
+ ctx.deferEndEvent(
+ StreamContext.PREFIX_TEXT + msgId,
+ new AguiEvent.TextMessageEnd(ctx.getThreadId(), ctx.getRunId(), msgId));
+ ctx.addActiveText(msgId);
+ }
+
+ if (!event.isLast()) {
+ ctx.emit(
+ new AguiEvent.TextMessageContent(
+ ctx.getThreadId(), ctx.getRunId(), msgId, text));
+ }
+ }
+
+ if (event.isLast() && ctx.isTextActive(msgId)) {
+ ctx.flushEndEvent(StreamContext.PREFIX_TEXT + msgId);
+ ctx.removeActiveText(msgId);
+ }
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java
new file mode 100644
index 000000000..515219578
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ThinkingBlockConverter.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter.strategy;
+
+import io.agentscope.core.agent.Event;
+import io.agentscope.core.agent.EventType;
+import io.agentscope.core.agui.adapter.StreamContext;
+import io.agentscope.core.agui.event.AguiEvent;
+import io.agentscope.core.message.ThinkingBlock;
+
+/**
+ * Converter for handling ThinkingBlock events, transforming them into AG-UI ReasoningMessage events.
+ */
+public class ThinkingBlockConverter implements BlockEventConverter {
+
+ @Override
+ public boolean isApplicable(Event event) {
+ return event.getType() == EventType.REASONING || event.getType() == EventType.SUMMARY;
+ }
+
+ @Override
+ public void convert(ThinkingBlock block, Event event, StreamContext ctx) {
+ // ignore if reasoning output is disabled
+ if (!ctx.getConfig().isEnableReasoning()) {
+ return;
+ }
+
+ String msgId = event.getMessage().getId();
+ if (ctx.isReasoningFinished(msgId)) {
+ return;
+ }
+
+ String thinking = block.getThinking();
+ if (thinking != null && !thinking.isBlank()) {
+ if (!ctx.isReasoningActive(msgId)) {
+ ctx.flushAllActiveReasonings();
+
+ ctx.emit(
+ new AguiEvent.ReasoningMessageStart(
+ ctx.getThreadId(), ctx.getRunId(), msgId, "reasoning"));
+ ctx.deferEndEvent(
+ StreamContext.PREFIX_REASONING + msgId,
+ new AguiEvent.ReasoningMessageEnd(
+ ctx.getThreadId(), ctx.getRunId(), msgId));
+ ctx.addActiveReasoning(msgId);
+ }
+
+ if (!event.isLast()) {
+ ctx.emit(
+ new AguiEvent.ReasoningMessageContent(
+ ctx.getThreadId(), ctx.getRunId(), msgId, thinking));
+ }
+ }
+
+ if (event.isLast() && ctx.isReasoningActive(msgId)) {
+ ctx.flushEndEvent(StreamContext.PREFIX_REASONING + msgId);
+ ctx.removeActiveReasoning(msgId);
+ }
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java
new file mode 100644
index 000000000..221aa1497
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolResultBlockConverter.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter.strategy;
+
+import io.agentscope.core.agent.Event;
+import io.agentscope.core.agent.EventType;
+import io.agentscope.core.agui.adapter.StreamContext;
+import io.agentscope.core.agui.event.AguiEvent;
+import io.agentscope.core.message.ContentBlock;
+import io.agentscope.core.message.TextBlock;
+import io.agentscope.core.message.ToolResultBlock;
+import java.util.UUID;
+
+/**
+ * Converter for handling ToolResultBlock events, transforming them into AG-UI ToolCallResult events.
+ */
+public class ToolResultBlockConverter implements BlockEventConverter {
+
+ @Override
+ public boolean isApplicable(Event event) {
+ return event.getType() == EventType.TOOL_RESULT && event.isLast();
+ }
+
+ @Override
+ public void convert(ToolResultBlock block, Event event, StreamContext ctx) {
+ String toolCallId =
+ block.getId() != null
+ ? block.getId()
+ : (ctx.getLastActiveToolId() != null
+ ? ctx.getLastActiveToolId()
+ : UUID.randomUUID().toString());
+ String result = extractToolResultText(block);
+
+ // Closing Start/End Phase
+ if (ctx.isToolActive(toolCallId)) {
+ ctx.flushEndEvent(StreamContext.PREFIX_TOOL + toolCallId);
+ } else {
+ // Fall-back: The previous process did not proceed to Start for some reason
+ // (e.g., recovery directly from the context)
+ String toolName =
+ block.getName() != null && !block.getName().isBlank()
+ ? block.getName()
+ : "unknown";
+ ctx.emit(
+ new AguiEvent.ToolCallStart(
+ ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName));
+ ctx.emit(new AguiEvent.ToolCallEnd(ctx.getThreadId(), ctx.getRunId(), toolCallId));
+ }
+
+ ctx.emit(
+ new AguiEvent.ToolCallResult(
+ ctx.getThreadId(),
+ ctx.getRunId(),
+ toolCallId,
+ result != null ? result : "",
+ "tool",
+ event.getMessage().getId()));
+
+ if (ctx.isToolActive(toolCallId)) {
+ ctx.removeActiveTool(toolCallId);
+ }
+ }
+
+ /**
+ * Extract text content from a tool result block.
+ *
+ * @param toolResult The tool result block
+ * @return The text content, or null if not present
+ */
+ private String extractToolResultText(ToolResultBlock toolResult) {
+ if (toolResult.getOutput() == null || toolResult.getOutput().isEmpty()) {
+ return null;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (ContentBlock output : toolResult.getOutput()) {
+ if (output instanceof TextBlock textBlock) {
+ if (!sb.isEmpty()) {
+ sb.append("\n");
+ }
+ sb.append(textBlock.getText());
+ }
+ }
+
+ return !sb.isEmpty() ? sb.toString() : null;
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java
new file mode 100644
index 000000000..4c917fab0
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-agui/src/main/java/io/agentscope/core/agui/adapter/strategy/ToolUseBlockConverter.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.agui.adapter.strategy;
+
+import io.agentscope.core.agent.Event;
+import io.agentscope.core.agent.EventType;
+import io.agentscope.core.agui.adapter.StreamContext;
+import io.agentscope.core.agui.event.AguiEvent;
+import io.agentscope.core.message.ToolUseBlock;
+import java.util.UUID;
+
+/**
+ * Converter for handling ToolUseBlock events, transforming them into AG-UI ToolCallStart and ToolCallArgs events.
+ */
+public class ToolUseBlockConverter implements BlockEventConverter {
+
+ @Override
+ public boolean isApplicable(Event event) {
+ return event.getType() == EventType.REASONING || event.getType() == EventType.SUMMARY;
+ }
+
+ @Override
+ public void convert(ToolUseBlock block, Event event, StreamContext ctx) {
+ String toolCallId =
+ block.getId() != null
+ ? block.getId()
+ : (ctx.getLastActiveToolId() != null
+ ? ctx.getLastActiveToolId()
+ : UUID.randomUUID().toString());
+
+ if (!ctx.isToolActive(toolCallId)) {
+ // End any active Text/Reasoning message before starting tool call
+ ctx.flushAllActiveTexts();
+ ctx.flushAllActiveReasonings();
+
+ String toolName =
+ block.getName() != null && !block.getName().isBlank()
+ ? block.getName()
+ : "unknown";
+ ctx.emit(
+ new AguiEvent.ToolCallStart(
+ ctx.getThreadId(), ctx.getRunId(), toolCallId, toolName));
+ ctx.deferEndEvent(
+ StreamContext.PREFIX_TOOL + toolCallId,
+ new AguiEvent.ToolCallEnd(ctx.getThreadId(), ctx.getRunId(), toolCallId));
+ ctx.addActiveTool(toolCallId);
+ }
+
+ // Emit tool call args if enabled
+ if (ctx.getConfig().isEmitToolCallArgs() && !event.isLast()) {
+ String args = block.getContent();
+ if (args != null && !args.isBlank()) {
+ ctx.emit(
+ new AguiEvent.ToolCallArgs(
+ ctx.getThreadId(), ctx.getRunId(), toolCallId, args));
+ }
+ }
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java
index e1ac649a6..631fbd589 100644
--- a/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java
+++ b/agentscope-extensions/agentscope-extensions-agui/src/test/java/io/agentscope/core/agui/adapter/AguiAgentAdapterTest.java
@@ -31,6 +31,7 @@
import io.agentscope.core.agui.event.AguiEvent;
import io.agentscope.core.agui.model.AguiMessage;
import io.agentscope.core.agui.model.RunAgentInput;
+import io.agentscope.core.message.CustomBlock;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
@@ -654,9 +655,10 @@ void testToolUseBlockWithNullId() {
}
@Test
- void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() {
- // Test that when a text message is interrupted by a tool call and then the last event
- // contains text blocks with the same message ID, only one TextMessageEnd is emitted
+ void testTextMessageIgnoredAfterToolCallInterruption() {
+ // Test that when a text message is interrupted by a tool call,
+ // the active text message is closed immediately and marked as finished.
+ // Subsequent text blocks with the same message ID should be ignored.
String msgId = "msg-text";
Msg firstMsg =
Msg.builder()
@@ -686,6 +688,7 @@ void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() {
Event firstEvent = new Event(EventType.REASONING, firstMsg, false);
Event toolCallEvent = new Event(EventType.REASONING, toolCall1, false);
Event lastEvent = new Event(EventType.REASONING, lastMsg, true);
+
when(mockAgent.stream(anyList(), any(StreamOptions.class)))
.thenReturn(Flux.just(firstEvent, toolCallEvent, lastEvent));
@@ -700,7 +703,21 @@ void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() {
assertNotNull(events);
- // Should have exactly one TextMessageEnd for the same message ID
+ long textStartCount =
+ events.stream()
+ .filter(e -> e instanceof AguiEvent.TextMessageStart)
+ .filter(
+ e -> {
+ AguiEvent.TextMessageStart start =
+ (AguiEvent.TextMessageStart) e;
+ return msgId.equals(start.messageId());
+ })
+ .count();
+ assertEquals(
+ 1,
+ textStartCount,
+ "Should emit exactly 1 TextMessageStart. The subsequent block should be ignored");
+
long textEndCount =
events.stream()
.filter(e -> e instanceof AguiEvent.TextMessageEnd)
@@ -710,7 +727,11 @@ void testTextMessageEndNotDuplicatedWhenLastEventAfterToolCall() {
return msgId.equals(end.messageId());
})
.count();
- assertEquals(1, textEndCount, "Should have exactly 1 TextMessageEnd per message ID");
+ assertEquals(
+ 1,
+ textEndCount,
+ "Should emit exactly 1 TextMessageEnd: the one triggered by the tool"
+ + " interruption.");
}
@Test
@@ -1696,4 +1717,96 @@ void testRunWithNullThinkingBlock() {
!hasReasoningMessageStart,
"Should NOT have ReasoningMessageStart for null thinking");
}
+
+ @Test
+ void testRunWithCustomEventForToolProgress() {
+ // Simulate intermediate tool progress using CustomBlock (50% downloaded)
+ Msg progressMsg1 =
+ Msg.builder()
+ .id("msg-tr1")
+ .role(MsgRole.TOOL)
+ .content(
+ CustomBlock.builder()
+ .name("tool_progress")
+ .value(Map.of("progress", "50%"))
+ .build())
+ .build();
+ Event progressEvent1 = new Event(EventType.TOOL_RESULT, progressMsg1, false);
+
+ // Simulate intermediate tool progress (100% downloaded)
+ Msg progressMsg2 =
+ Msg.builder()
+ .id("msg-tr1")
+ .role(MsgRole.TOOL)
+ .content(
+ CustomBlock.builder()
+ .name("tool_progress")
+ .value(Map.of("progress", "100%"))
+ .build())
+ .build();
+ Event progressEvent2 = new Event(EventType.TOOL_RESULT, progressMsg2, false);
+
+ // Simulate final tool result (isLast = true)
+ Msg finalResultMsg =
+ Msg.builder()
+ .id("msg-tr1")
+ .role(MsgRole.TOOL)
+ .content(
+ ToolResultBlock.builder()
+ .id("tc-1")
+ .output(
+ TextBlock.builder()
+ .text("Download complete")
+ .build())
+ .build())
+ .build();
+ Event finalResultEvent = new Event(EventType.TOOL_RESULT, finalResultMsg, true);
+
+ when(mockAgent.stream(anyList(), any(StreamOptions.class)))
+ .thenReturn(Flux.just(progressEvent1, progressEvent2, finalResultEvent));
+
+ RunAgentInput input =
+ RunAgentInput.builder()
+ .threadId("thread-1")
+ .runId("run-1")
+ .messages(List.of(AguiMessage.userMessage("msg-1", "Download file")))
+ .build();
+
+ List events = adapter.run(input).collectList().block();
+
+ assertNotNull(events);
+
+ // Verify that exactly 2 Custom events were emitted for the progress
+ long customEventCount = events.stream().filter(e -> e instanceof AguiEvent.Custom).count();
+ assertEquals(2, customEventCount, "Should have 2 Custom events for tool progress");
+
+ // Verify the content of the Custom events
+ List customEvents =
+ events.stream()
+ .filter(e -> e instanceof AguiEvent.Custom)
+ .map(e -> (AguiEvent.Custom) e)
+ .toList();
+
+ assertEquals("tool_progress", customEvents.get(0).name());
+ Map value1 = (Map) customEvents.get(0).value();
+ assertEquals("50%", value1.get("progress"));
+
+ assertEquals("tool_progress", customEvents.get(1).name());
+ Map value2 = (Map) customEvents.get(1).value();
+ assertEquals("100%", value2.get("progress"));
+
+ // Verify that the final ToolCallResult was emitted correctly
+ long toolResultCount =
+ events.stream().filter(e -> e instanceof AguiEvent.ToolCallResult).count();
+ assertEquals(
+ 1, toolResultCount, "Should have exactly 1 ToolCallResult for the final event");
+
+ AguiEvent.ToolCallResult finalResult =
+ (AguiEvent.ToolCallResult)
+ events.stream()
+ .filter(e -> e instanceof AguiEvent.ToolCallResult)
+ .findFirst()
+ .orElseThrow();
+ assertEquals("Download complete", finalResult.content());
+ }
}