Skip to content

Conversation

@OhYee
Copy link
Member

@OhYee OhYee commented Dec 9, 2025

重构服务器协议架构,引入抽象协议处理器基类。新增 AG-UI 协议完整实现,包括事件类型定义和处理器。扩展 AgentRequest 模型支持原始请求访问和生命周期钩子注入。更新 OpenAI 协议适配器以兼容新钩子系统。修改服务器默认配置同时启用 OpenAI 和 AG-UI 双协议。

BREAKING CHANGE: 协议处理器接口重构,parse_request 现在接收 Request 对象并返回上下文。AgentRequest 结构变更增加 raw_headers、raw_body 和 hooks 字段。AgentResult 类型扩展支持 AgentEvent 对象。

Change-Id: I8527db7539fa62ce39e80e28068a98a0b2db3ba3

Thank you for creating a pull request to contribute to Serverless Devs agentrun-sdk-python code! Before you open the request please answer the following questions to help it be more easily integrated. Please check the boxes "[ ]" with "[x]" when done too.
Please select one of the PR types below to complete


Fix bugs

Bug detail

The specific manifestation of the bug or the associated issue.

Pull request tasks

  • Add test cases for the changes
  • Passed the CI test

Update docs

Reason for update

Why do you need to update your documentation?

Pull request tasks

  • Update Chinese documentation
  • Update English documentation

Add contributor

Contributed content

  • Code
  • Document

Content detail

if content_type == 'code' || content_type == 'document':
    please tell us `PR url`,like: https://github.com/Serverless-Devs/agentrun-sdk-python/pull/1
else:
    please describe your contribution in detail

Others

Reason for update

Why do you need to update your documentation?

Copilot AI review requested due to automatic review settings December 9, 2025 03:14
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a comprehensive lifecycle hooks system and AG-UI protocol support to the AgentRun Server, marking a significant architectural enhancement. The implementation enables real-time event streaming for agent execution stages (steps, tool calls, text messages, state changes) through a protocol-agnostic hooks abstraction. The refactoring maintains backward compatibility at the API level while introducing breaking changes to the protocol handler interface.

Key Changes:

  • Introduced AgentLifecycleHooks abstract base class with protocol-specific implementations (OpenAI, AG-UI)
  • Added complete AG-UI protocol handler with full event stream support
  • Extended AgentRequest with raw_headers, raw_body, and hooks fields for enhanced request context
  • Refactored BaseProtocolHandler with create_hooks() and modified parse_request() to return tuples
  • Updated default server configuration to enable both OpenAI and AG-UI protocols simultaneously

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
agentrun/server/agui_protocol.py New complete AG-UI protocol implementation with event types, lifecycle hooks, and SSE streaming (834 lines)
agentrun/server/model.py Added AgentLifecycleHooks ABC, AgentEvent class, and extended AgentRequest with hooks/raw request fields
agentrun/server/openai_protocol.py Refactored to implement BaseProtocolHandler, added OpenAILifecycleHooks, improved streaming with thread pool execution
agentrun/server/protocol.py Introduced BaseProtocolHandler with create_hooks() and modified parse_request() signature (breaking change)
agentrun/server/server.py Updated default protocols to include both OpenAI and AG-UI, improved documentation examples
agentrun/server/__init__.py Critical bug: Documentation shows incorrect hook usage patterns with non-existent emit_* methods and wrong async patterns
examples/quick_start_sync.py Comprehensive sync example demonstrating lifecycle hooks and tool call events (234 lines)
examples/quick_start_async.py Async variant of quick start example with proper async/await patterns (241 lines)
examples/a.py LangChain integration example, but filename is non-descriptive (should be renamed)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 35 to 43
... async for event in hooks.on_step_start("processing"):
... yield event
...
... # 处理逻辑...
... yield "Hello, world!"
...
... # 发送步骤结束事件
... async for event in hooks.on_step_finish("processing"):
... yield event
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async example shows incorrect usage pattern. The on_* methods return Optional[AgentEvent], not async iterators. They should be used directly with yield, not with async for.

The correct async pattern should be:

async def invoke_agent(request: AgentRequest):
    hooks = request.hooks
    # Direct yield of event (may be None)
    yield hooks.on_step_start("processing")
    yield "Hello, world!"
    yield hooks.on_step_finish("processing")
Suggested change
... async for event in hooks.on_step_start("processing"):
... yield event
...
... # 处理逻辑...
... yield "Hello, world!"
...
... # 发送步骤结束事件
... async for event in hooks.on_step_finish("processing"):
... yield event
... yield hooks.on_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, world!"
...
... # 发送步骤结束事件
... yield hooks.on_step_finish("processing")

Copilot uses AI. Check for mistakes.
当前时间的字符串表示
"""
from datetime import datetime
import time
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant import: time is imported twice - once at the module level (line 25) and again inside the get_current_time function (line 47). Remove the inner import on line 47 since time is already available from the module-level import.

Suggested change
import time

Copilot uses AI. Check for mistakes.
examples/a.py Outdated
@@ -0,0 +1,166 @@
"""AgentRun Server + LangChain Agent 示例
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is named a.py which is not descriptive. Consider renaming to something more meaningful like langchain_agent_example.py or quick_start_langchain.py to clearly indicate its purpose and content.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 834
"""AG-UI (Agent-User Interaction Protocol) 协议实现
AG-UI 是一种开源、轻量级、基于事件的协议,用于标准化 AI Agent 与前端应用之间的交互。
参考: https://docs.ag-ui.com/
基于 Router 的设计:
- 协议自己创建 FastAPI Router
- 定义所有端点和处理逻辑
- Server 只需挂载 Router
生命周期钩子:
- AG-UI 完整支持所有生命周期事件
- 每个钩子映射到对应的 AG-UI 事件类型
"""

from enum import Enum
import json
import time
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Optional,
TYPE_CHECKING,
Union,
)
import uuid

from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field

from .model import (
AgentEvent,
AgentLifecycleHooks,
AgentRequest,
AgentResponse,
AgentResult,
AgentRunResult,
Message,
MessageRole,
)
from .protocol import BaseProtocolHandler

if TYPE_CHECKING:
from .invoker import AgentInvoker


# ============================================================================
# AG-UI 事件类型定义
# ============================================================================


class AGUIEventType(str, Enum):
"""AG-UI 事件类型
参考: https://docs.ag-ui.com/concepts/events
"""

# Lifecycle Events (生命周期事件)
RUN_STARTED = "RUN_STARTED"
RUN_FINISHED = "RUN_FINISHED"
RUN_ERROR = "RUN_ERROR"
STEP_STARTED = "STEP_STARTED"
STEP_FINISHED = "STEP_FINISHED"

# Text Message Events (文本消息事件)
TEXT_MESSAGE_START = "TEXT_MESSAGE_START"
TEXT_MESSAGE_CONTENT = "TEXT_MESSAGE_CONTENT"
TEXT_MESSAGE_END = "TEXT_MESSAGE_END"

# Tool Call Events (工具调用事件)
TOOL_CALL_START = "TOOL_CALL_START"
TOOL_CALL_ARGS = "TOOL_CALL_ARGS"
TOOL_CALL_END = "TOOL_CALL_END"
TOOL_CALL_RESULT = "TOOL_CALL_RESULT"

# State Events (状态事件)
STATE_SNAPSHOT = "STATE_SNAPSHOT"
STATE_DELTA = "STATE_DELTA"

# Message Events (消息事件)
MESSAGES_SNAPSHOT = "MESSAGES_SNAPSHOT"

# Special Events (特殊事件)
RAW = "RAW"
CUSTOM = "CUSTOM"


class AGUIRole(str, Enum):
"""AG-UI 消息角色"""

USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"


# ============================================================================
# AG-UI 事件模型
# ============================================================================


class AGUIBaseEvent(BaseModel):
"""AG-UI 基础事件"""

type: AGUIEventType
timestamp: Optional[int] = Field(
default_factory=lambda: int(time.time() * 1000)
)
rawEvent: Optional[Dict[str, Any]] = None


class AGUIRunStartedEvent(AGUIBaseEvent):
"""运行开始事件"""

type: AGUIEventType = AGUIEventType.RUN_STARTED
threadId: Optional[str] = None
runId: Optional[str] = None


class AGUIRunFinishedEvent(AGUIBaseEvent):
"""运行结束事件"""

type: AGUIEventType = AGUIEventType.RUN_FINISHED
threadId: Optional[str] = None
runId: Optional[str] = None


class AGUIRunErrorEvent(AGUIBaseEvent):
"""运行错误事件"""

type: AGUIEventType = AGUIEventType.RUN_ERROR
message: str
code: Optional[str] = None


class AGUIStepStartedEvent(AGUIBaseEvent):
"""步骤开始事件"""

type: AGUIEventType = AGUIEventType.STEP_STARTED
stepName: Optional[str] = None


class AGUIStepFinishedEvent(AGUIBaseEvent):
"""步骤结束事件"""

type: AGUIEventType = AGUIEventType.STEP_FINISHED
stepName: Optional[str] = None


class AGUITextMessageStartEvent(AGUIBaseEvent):
"""文本消息开始事件"""

type: AGUIEventType = AGUIEventType.TEXT_MESSAGE_START
messageId: str
role: AGUIRole = AGUIRole.ASSISTANT


class AGUITextMessageContentEvent(AGUIBaseEvent):
"""文本消息内容事件"""

type: AGUIEventType = AGUIEventType.TEXT_MESSAGE_CONTENT
messageId: str
delta: str


class AGUITextMessageEndEvent(AGUIBaseEvent):
"""文本消息结束事件"""

type: AGUIEventType = AGUIEventType.TEXT_MESSAGE_END
messageId: str


class AGUIToolCallStartEvent(AGUIBaseEvent):
"""工具调用开始事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_START
toolCallId: str
toolCallName: str
parentMessageId: Optional[str] = None


class AGUIToolCallArgsEvent(AGUIBaseEvent):
"""工具调用参数事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_ARGS
toolCallId: str
delta: str


class AGUIToolCallEndEvent(AGUIBaseEvent):
"""工具调用结束事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_END
toolCallId: str


class AGUIToolCallResultEvent(AGUIBaseEvent):
"""工具调用结果事件"""

type: AGUIEventType = AGUIEventType.TOOL_CALL_RESULT
toolCallId: str
result: str


class AGUIStateSnapshotEvent(AGUIBaseEvent):
"""状态快照事件"""

type: AGUIEventType = AGUIEventType.STATE_SNAPSHOT
snapshot: Dict[str, Any]


class AGUIStateDeltaEvent(AGUIBaseEvent):
"""状态增量事件"""

type: AGUIEventType = AGUIEventType.STATE_DELTA
delta: List[Dict[str, Any]] # JSON Patch 格式


class AGUIMessage(BaseModel):
"""AG-UI 消息格式"""

id: str
role: AGUIRole
content: Optional[str] = None
name: Optional[str] = None
toolCalls: Optional[List[Dict[str, Any]]] = None
toolCallId: Optional[str] = None


class AGUIMessagesSnapshotEvent(AGUIBaseEvent):
"""消息快照事件"""

type: AGUIEventType = AGUIEventType.MESSAGES_SNAPSHOT
messages: List[AGUIMessage]


class AGUIRawEvent(AGUIBaseEvent):
"""原始事件"""

type: AGUIEventType = AGUIEventType.RAW
event: Dict[str, Any]


class AGUICustomEvent(AGUIBaseEvent):
"""自定义事件"""

type: AGUIEventType = AGUIEventType.CUSTOM
name: str
value: Any


# 事件联合类型
AGUIEvent = Union[
AGUIRunStartedEvent,
AGUIRunFinishedEvent,
AGUIRunErrorEvent,
AGUIStepStartedEvent,
AGUIStepFinishedEvent,
AGUITextMessageStartEvent,
AGUITextMessageContentEvent,
AGUITextMessageEndEvent,
AGUIToolCallStartEvent,
AGUIToolCallArgsEvent,
AGUIToolCallEndEvent,
AGUIToolCallResultEvent,
AGUIStateSnapshotEvent,
AGUIStateDeltaEvent,
AGUIMessagesSnapshotEvent,
AGUIRawEvent,
AGUICustomEvent,
]


# ============================================================================
# AG-UI 请求模型
# ============================================================================


class AGUIRunAgentInput(BaseModel):
"""AG-UI 运行 Agent 请求"""

threadId: Optional[str] = None
runId: Optional[str] = None
messages: List[Dict[str, Any]] = Field(default_factory=list)
tools: Optional[List[Dict[str, Any]]] = None
context: Optional[List[Dict[str, Any]]] = None
forwardedProps: Optional[Dict[str, Any]] = None


# ============================================================================
# AG-UI 协议生命周期钩子实现
# ============================================================================


class AGUILifecycleHooks(AgentLifecycleHooks):
"""AG-UI 协议的生命周期钩子实现
AG-UI 完整支持所有生命周期事件,每个钩子映射到对应的 AG-UI 事件类型。
所有 on_* 方法直接返回 AgentEvent,可以直接 yield。
Example:
>>> def invoke_agent(request):
... hooks = request.hooks
... yield hooks.on_step_start("processing")
... yield hooks.on_tool_call_start(id="call_1", name="get_time")
... yield hooks.on_tool_call_args(id="call_1", args={"tz": "UTC"})
... result = get_time()
... yield hooks.on_tool_call_result(id="call_1", result=result)
... yield hooks.on_tool_call_end(id="call_1")
... yield f"时间: {result}"
... yield hooks.on_step_finish("processing")
"""

def __init__(self, context: Dict[str, Any]):
"""初始化钩子
Args:
context: 运行上下文,包含 threadId, runId 等
"""
self.context = context
self.thread_id = context.get("threadId", str(uuid.uuid4()))
self.run_id = context.get("runId", str(uuid.uuid4()))

def _create_event(self, event: AGUIBaseEvent) -> AgentEvent:
"""创建 AgentEvent
Args:
event: AG-UI 事件对象
Returns:
AgentEvent 对象
"""
json_str = event.model_dump_json(exclude_none=True)
raw_sse = f"data: {json_str}\n\n"
return AgentEvent(
event_type=event.type.value
if hasattr(event.type, "value")
else str(event.type),
data=event.model_dump(exclude_none=True),
raw_sse=raw_sse,
)

# =========================================================================
# 生命周期事件方法 (on_*) - 直接返回 AgentEvent
# =========================================================================

def on_run_start(self) -> AgentEvent:
"""发送 RUN_STARTED 事件"""
return self._create_event(
AGUIRunStartedEvent(threadId=self.thread_id, runId=self.run_id)
)

def on_run_finish(self) -> AgentEvent:
"""发送 RUN_FINISHED 事件"""
return self._create_event(
AGUIRunFinishedEvent(threadId=self.thread_id, runId=self.run_id)
)

def on_run_error(
self, error: str, code: Optional[str] = None
) -> AgentEvent:
"""发送 RUN_ERROR 事件"""
return self._create_event(AGUIRunErrorEvent(message=error, code=code))

def on_step_start(self, step_name: Optional[str] = None) -> AgentEvent:
"""发送 STEP_STARTED 事件"""
return self._create_event(AGUIStepStartedEvent(stepName=step_name))

def on_step_finish(self, step_name: Optional[str] = None) -> AgentEvent:
"""发送 STEP_FINISHED 事件"""
return self._create_event(AGUIStepFinishedEvent(stepName=step_name))

def on_text_message_start(
self, message_id: str, role: str = "assistant"
) -> AgentEvent:
"""发送 TEXT_MESSAGE_START 事件"""
try:
agui_role = AGUIRole(role)
except ValueError:
agui_role = AGUIRole.ASSISTANT
return self._create_event(
AGUITextMessageStartEvent(messageId=message_id, role=agui_role)
)

def on_text_message_content(
self, message_id: str, delta: str
) -> Optional[AgentEvent]:
"""发送 TEXT_MESSAGE_CONTENT 事件"""
if not delta:
return None
return self._create_event(
AGUITextMessageContentEvent(messageId=message_id, delta=delta)
)

def on_text_message_end(self, message_id: str) -> AgentEvent:
"""发送 TEXT_MESSAGE_END 事件"""
return self._create_event(AGUITextMessageEndEvent(messageId=message_id))

def on_tool_call_start(
self,
id: str,
name: str,
parent_message_id: Optional[str] = None,
) -> AgentEvent:
"""发送 TOOL_CALL_START 事件"""
return self._create_event(
AGUIToolCallStartEvent(
toolCallId=id,
toolCallName=name,
parentMessageId=parent_message_id,
)
)

def on_tool_call_args_delta(
self, id: str, delta: str
) -> Optional[AgentEvent]:
"""发送 TOOL_CALL_ARGS 事件(增量)"""
if not delta:
return None
return self._create_event(
AGUIToolCallArgsEvent(toolCallId=id, delta=delta)
)

def on_tool_call_args(
self, id: str, args: Union[str, Dict[str, Any]]
) -> AgentEvent:
"""发送完整的 TOOL_CALL_ARGS 事件"""
if isinstance(args, dict):
args = json.dumps(args, ensure_ascii=False)
return self._create_event(
AGUIToolCallArgsEvent(toolCallId=id, delta=args)
)

def on_tool_call_result_delta(
self, id: str, delta: str
) -> Optional[AgentEvent]:
"""发送 TOOL_CALL_RESULT 事件(增量)"""
if not delta:
return None
return self._create_event(
AGUIToolCallResultEvent(toolCallId=id, result=delta)
)

def on_tool_call_result(self, id: str, result: str) -> AgentEvent:
"""发送 TOOL_CALL_RESULT 事件"""
return self._create_event(
AGUIToolCallResultEvent(toolCallId=id, result=result)
)

def on_tool_call_end(self, id: str) -> AgentEvent:
"""发送 TOOL_CALL_END 事件"""
return self._create_event(AGUIToolCallEndEvent(toolCallId=id))

def on_state_snapshot(self, snapshot: Dict[str, Any]) -> AgentEvent:
"""发送 STATE_SNAPSHOT 事件"""
return self._create_event(AGUIStateSnapshotEvent(snapshot=snapshot))

def on_state_delta(self, delta: List[Dict[str, Any]]) -> AgentEvent:
"""发送 STATE_DELTA 事件"""
return self._create_event(AGUIStateDeltaEvent(delta=delta))

def on_custom_event(self, name: str, value: Any) -> AgentEvent:
"""发送 CUSTOM 事件"""
return self._create_event(AGUICustomEvent(name=name, value=value))


# ============================================================================
# AG-UI 协议处理器
# ============================================================================


class AGUIProtocolHandler(BaseProtocolHandler):
"""AG-UI 协议处理器
实现 AG-UI (Agent-User Interaction Protocol) 兼容接口
参考: https://docs.ag-ui.com/
特点:
- 基于事件的流式通信
- 完整支持所有生命周期事件
- 支持状态同步
- 支持工具调用
Example:
>>> from agentrun.server import AgentRunServer, AGUIProtocolHandler
>>>
>>> server = AgentRunServer(
... invoke_agent=my_agent,
... protocols=[AGUIProtocolHandler()]
... )
>>> server.start(port=8000)
# 可访问: POST http://localhost:8000/agui/v1/run
"""

def get_prefix(self) -> str:
"""AG-UI 协议建议使用 /agui/v1 前缀"""
return "/agui/v1"

def create_hooks(self, context: Dict[str, Any]) -> AgentLifecycleHooks:
"""创建 AG-UI 协议的生命周期钩子"""
return AGUILifecycleHooks(context)

def as_fastapi_router(self, agent_invoker: "AgentInvoker") -> APIRouter:
"""创建 AG-UI 协议的 FastAPI Router"""
router = APIRouter()

@router.post("/run")
async def run_agent(request: Request):
"""AG-UI 运行 Agent 端点
接收 AG-UI 格式的请求,返回 SSE 事件流。
"""
# SSE 响应头,禁用缓冲
sse_headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 nginx 缓冲
}

try:
# 1. 解析请求
request_data = await request.json()
agent_request, context = await self.parse_request(
request, request_data
)

# 2. 调用 Agent
agent_result = await agent_invoker.invoke(agent_request)

# 3. 格式化为 AG-UI 事件流
event_stream = self.format_response(
agent_result, agent_request, context
)

# 4. 返回 SSE 流
return StreamingResponse(
event_stream,
media_type="text/event-stream",
headers=sse_headers,
)

except ValueError as e:
# 返回错误事件流
return StreamingResponse(
self._error_stream(str(e)),
media_type="text/event-stream",
headers=sse_headers,
)
except Exception as e:
return StreamingResponse(
self._error_stream(f"Internal error: {str(e)}"),
media_type="text/event-stream",
headers=sse_headers,
)

@router.get("/health")
async def health_check():
"""健康检查端点"""
return {"status": "ok", "protocol": "ag-ui", "version": "1.0"}

return router

async def parse_request(
self,
request: Request,
request_data: Dict[str, Any],
) -> tuple[AgentRequest, Dict[str, Any]]:
"""解析 AG-UI 格式的请求
Args:
request: FastAPI Request 对象
request_data: HTTP 请求体 JSON 数据
Returns:
tuple: (AgentRequest, context)
Raises:
ValueError: 请求格式不正确
"""
# 创建上下文
context = {
"threadId": request_data.get("threadId") or str(uuid.uuid4()),
"runId": request_data.get("runId") or str(uuid.uuid4()),
}

# 创建钩子
hooks = self.create_hooks(context)

# 解析消息列表
messages = []
raw_messages = request_data.get("messages", [])

for msg_data in raw_messages:
if not isinstance(msg_data, dict):
continue

role_str = msg_data.get("role", "user")
try:
role = MessageRole(role_str)
except ValueError:
role = MessageRole.USER

messages.append(
Message(
role=role,
content=msg_data.get("content"),
name=msg_data.get("name"),
tool_calls=msg_data.get("toolCalls"),
tool_call_id=msg_data.get("toolCallId"),
)
)

# 提取原始请求头
raw_headers = dict(request.headers)

# 构建 AgentRequest
agent_request = AgentRequest(
messages=messages,
stream=True, # AG-UI 总是流式
tools=request_data.get("tools"),
raw_headers=raw_headers,
raw_body=request_data,
hooks=hooks,
)

# 保存额外参数
agent_request.extra = {
"threadId": context["threadId"],
"runId": context["runId"],
"context": request_data.get("context"),
"forwardedProps": request_data.get("forwardedProps"),
}

return agent_request, context

async def format_response(
self,
result: AgentResult,
request: AgentRequest,
context: Dict[str, Any],
) -> AsyncIterator[str]:
"""格式化响应为 AG-UI 事件流
Agent 可以 yield 三种类型的内容:
1. 普通字符串 - 会被包装成 TEXT_MESSAGE_CONTENT 事件
2. AgentEvent - 直接输出其 raw_sse
3. None - 忽略
Args:
result: Agent 执行结果
request: 原始请求
context: 运行上下文
Yields:
SSE 格式的事件数据
"""
hooks = request.hooks
message_id = str(uuid.uuid4())
text_message_started = False

# 1. 发送 RUN_STARTED 事件
if hooks:
event = hooks.on_run_start()
if event and event.raw_sse:
yield event.raw_sse

try:
# 2. 处理 Agent 结果
content = self._extract_content(result)

# 3. 流式发送内容
if self._is_iterator(content):
async for chunk in self._iterate_content(content):
if chunk is None:
continue

# 检查是否是 AgentEvent
if isinstance(chunk, AgentEvent):
if chunk.raw_sse:
yield chunk.raw_sse
elif isinstance(chunk, str) and chunk:
# 普通文本内容,包装成 TEXT_MESSAGE_CONTENT
if not text_message_started and hooks:
# 延迟发送 TEXT_MESSAGE_START,只在有文本内容时才发送
event = hooks.on_text_message_start(message_id)
if event and event.raw_sse:
yield event.raw_sse
text_message_started = True

if hooks:
event = hooks.on_text_message_content(
message_id, chunk
)
if event and event.raw_sse:
yield event.raw_sse
else:
# 非迭代器内容
if isinstance(content, AgentEvent):
if content.raw_sse:
yield content.raw_sse
elif content:
content_str = str(content)
if hooks:
event = hooks.on_text_message_start(message_id)
if event and event.raw_sse:
yield event.raw_sse
text_message_started = True
event = hooks.on_text_message_content(
message_id, content_str
)
if event and event.raw_sse:
yield event.raw_sse

# 4. 发送 TEXT_MESSAGE_END 事件(如果有文本消息)
if text_message_started and hooks:
event = hooks.on_text_message_end(message_id)
if event and event.raw_sse:
yield event.raw_sse

# 5. 发送 RUN_FINISHED 事件
if hooks:
event = hooks.on_run_finish()
if event and event.raw_sse:
yield event.raw_sse

except Exception as e:
# 发送错误事件
if hooks:
event = hooks.on_run_error(str(e), "AGENT_ERROR")
if event and event.raw_sse:
yield event.raw_sse

async def _error_stream(self, message: str) -> AsyncIterator[str]:
"""生成错误事件流"""
context = {
"threadId": str(uuid.uuid4()),
"runId": str(uuid.uuid4()),
}
hooks = self.create_hooks(context)

event = hooks.on_run_start()
if event and event.raw_sse:
yield event.raw_sse
event = hooks.on_run_error(message, "REQUEST_ERROR")
if event and event.raw_sse:
yield event.raw_sse

def _extract_content(self, result: AgentResult) -> Any:
"""从结果中提取内容"""
if isinstance(result, AgentRunResult):
return result.content
if isinstance(result, AgentResponse):
return result.content
if isinstance(result, str):
return result
return result

async def _iterate_content(
self, content: Union[Iterator, AsyncIterator]
) -> AsyncIterator:
"""统一迭代同步和异步迭代器
支持迭代包含字符串或 AgentEvent 的迭代器。
对于同步迭代器,每次 next() 调用都在线程池中执行,避免阻塞事件循环。
"""
import asyncio

if hasattr(content, "__aiter__"):
# 异步迭代器
async for chunk in content: # type: ignore
yield chunk
else:
# 同步迭代器 - 在线程池中迭代,避免阻塞
loop = asyncio.get_event_loop()
iterator = iter(content) # type: ignore

while True:
try:
# 在线程池中执行 next(),避免 time.sleep 阻塞事件循环
chunk = await loop.run_in_executor(None, next, iterator)
yield chunk
except StopIteration:
break


# ============================================================================
# 辅助函数 - 用于用户自定义 AG-UI 事件
# ============================================================================


def create_agui_event(event_type: AGUIEventType, **kwargs) -> AGUIBaseEvent:
"""创建 AG-UI 事件的辅助函数
Args:
event_type: 事件类型
**kwargs: 事件参数
Returns:
对应类型的事件对象
Example:
>>> event = create_agui_event(
... AGUIEventType.TEXT_MESSAGE_CONTENT,
... messageId="msg-123",
... delta="Hello"
... )
"""
event_classes = {
AGUIEventType.RUN_STARTED: AGUIRunStartedEvent,
AGUIEventType.RUN_FINISHED: AGUIRunFinishedEvent,
AGUIEventType.RUN_ERROR: AGUIRunErrorEvent,
AGUIEventType.STEP_STARTED: AGUIStepStartedEvent,
AGUIEventType.STEP_FINISHED: AGUIStepFinishedEvent,
AGUIEventType.TEXT_MESSAGE_START: AGUITextMessageStartEvent,
AGUIEventType.TEXT_MESSAGE_CONTENT: AGUITextMessageContentEvent,
AGUIEventType.TEXT_MESSAGE_END: AGUITextMessageEndEvent,
AGUIEventType.TOOL_CALL_START: AGUIToolCallStartEvent,
AGUIEventType.TOOL_CALL_ARGS: AGUIToolCallArgsEvent,
AGUIEventType.TOOL_CALL_END: AGUIToolCallEndEvent,
AGUIEventType.TOOL_CALL_RESULT: AGUIToolCallResultEvent,
AGUIEventType.STATE_SNAPSHOT: AGUIStateSnapshotEvent,
AGUIEventType.STATE_DELTA: AGUIStateDeltaEvent,
AGUIEventType.MESSAGES_SNAPSHOT: AGUIMessagesSnapshotEvent,
AGUIEventType.RAW: AGUIRawEvent,
AGUIEventType.CUSTOM: AGUICustomEvent,
}

event_class = event_classes.get(event_type, AGUIBaseEvent)
return event_class(type=event_type, **kwargs)
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new AG-UI protocol handler (AGUIProtocolHandler) and lifecycle hooks system lack test coverage. This is a substantial new feature that should have comprehensive unit tests covering:

  • Request parsing and validation
  • Event generation for all hook methods
  • Response formatting
  • Error handling
  • SSE stream generation

Consider adding tests in tests/unittests/server/ directory similar to the existing test structure for other modules.

Copilot uses AI. Check for mistakes.
Comment on lines 20 to 28
... # 发送步骤开始事件 (使用 emit_* 同步方法)
... yield hooks.emit_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, "
... yield "world!"
...
... # 发送步骤结束事件
... yield hooks.emit_step_finish("processing")
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation example references non-existent methods emit_step_start() and emit_step_finish(). Based on the actual implementation in agentrun/server/model.py, the lifecycle hooks only provide on_* methods that return Optional[AgentEvent].

The correct usage pattern should be:

def invoke_agent(request: AgentRequest):
    hooks = request.hooks
    # Direct yield of event (may be None)
    yield hooks.on_step_start("processing")
    yield "Hello, "
    yield "world!"
    yield hooks.on_step_finish("processing")
Suggested change
... # 发送步骤开始事件 (使用 emit_* 同步方法)
... yield hooks.emit_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, "
... yield "world!"
...
... # 发送步骤结束事件
... yield hooks.emit_step_finish("processing")
... # 发送步骤开始事件 (使用 on_* 同步方法)
... yield hooks.on_step_start("processing")
...
... # 处理逻辑...
... yield "Hello, "
... yield "world!"
...
... # 发送步骤结束事件
... yield hooks.on_step_finish("processing")

Copilot uses AI. Check for mistakes.
- context: 协议特定的上下文信息
"""
# 提取原始请求头
raw_headers = dict(request.headers)
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable raw_headers is not used.

Suggested change
raw_headers = dict(request.headers)

Copilot uses AI. Check for mistakes.
"""

import asyncio
from typing import Any, AsyncIterator, Callable, Dict, Optional
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Optional' is not used.
Import of 'Any' is not used.

Suggested change
from typing import Any, AsyncIterator, Callable, Dict, Optional
from typing import AsyncIterator, Callable, Dict

Copilot uses AI. Check for mistakes.
"""

import time
from typing import Any, Callable, Dict, Iterator, List, Optional
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'List' is not used.
Import of 'Optional' is not used.
Import of 'Any' is not used.

Suggested change
from typing import Any, Callable, Dict, Iterator, List, Optional
from typing import Callable, Dict, Iterator

Copilot uses AI. Check for mistakes.
当前时间的字符串表示
"""
# 模拟异步 I/O 操作
import asyncio
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import of module asyncio is redundant, as it was previously imported on line 26.

Suggested change
import asyncio

Copilot uses AI. Check for mistakes.
@OhYee OhYee requested a review from Copilot December 17, 2025 08:08
OhYee added 16 commits December 17, 2025 16:20
重构服务器协议架构,引入抽象协议处理器基类。新增 AG-UI 协议完整实现,包括事件类型定义和处理器。扩展 AgentRequest 模型支持原始请求访问和生命周期钩子注入。更新 OpenAI 协议适配器以兼容新钩子系统。修改服务器默认配置同时启用 OpenAI 和 AG-UI 双协议。

BREAKING CHANGE: 协议处理器接口重构,parse_request 现在接收 Request 对象并返回上下文。AgentRequest 结构变更增加 raw_headers、raw_body 和 hooks 字段。AgentResult 类型扩展支持 AgentEvent 对象。

Change-Id: I8527db7539fa62ce39e80e28068a98a0b2db3ba3
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…ator handlers and lifecycle hooks

This commit introduces comprehensive AGUI protocol support with lifecycle hooks for both LangChain and LangGraph integrations. The changes include:

1. Added convert function exports to both LangChain and LangGraph integration modules with usage examples
2. Refactored model adapter to use default_headers instead of async_client for ChatOpenAI
3. Enhanced server invoker to handle both coroutine and async generator functions
4. Updated protocol handlers to support async response formatting and safe iterator handling
5. Refined AgentRequest model to be protocol-agnostic with core fields
6. Added CORS middleware configuration to AgentRunServer
7. Updated quick start example to use async streaming with event conversion

The changes enable proper streaming support, tool call event handling, and improved async performance for both LangChain and LangGraph integrations.

feat(langchain|langgraph): 添加 AGUI 协议支持和异步生成器处理器以及生命周期钩子

此提交为 LangChain 和 LangGraph 集成引入了全面的 AGUI 协议支持和生命周期钩子。变更包括:

1. 为 LangChain 和 LangGraph 集成模块添加了 convert 函数导出以及使用示例
2. 重构模型适配器,使用 default_headers 替代 async_client 用于 ChatOpenAI
3. 增强服务器调用器以处理协程和异步生成器函数
4. 更新协议处理器以支持异步响应格式化和安全的迭代器处理
5. 精简 AgentRequest 模型以实现协议无关性并使用核心字段
6. 为 AgentRunServer 添加 CORS 中间件配置
7. 更新快速开始示例以使用带事件转换的异步流

这些变更启用了适当的流式传输支持、工具调用事件处理以及改进的异步性能。

Change-Id: I941d1d797b930243282555b5a6db0e6d420f3691
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…mented out obsolete tool start handler

    - Remove unnecessary line breaks and reformat function calls for better readability
    - Comment out on_tool_start event handler that was causing duplicate tool call events
    - Adjust code indentation and formatting in _get_tool_chunks and _get_tool_calls functions

  修复代码格式并注释掉过时的工具开始处理器

    - 移除不必要的换行并重新格式化函数调用以提高可读性
    - 注释掉导致工具调用事件重复的 on_tool_start 事件处理器
    - 调整 _get_tool_chunks 和 _get_tool_calls 函数中的代码缩进和格式

Change-Id: Id90475c30ecc0134dae7ef2d1b97ef20986018a1
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…esult events

This change replaces the previous hook-based system with a standardized event-driven architecture using AgentResult. The new system provides consistent event handling across all protocols (AG-UI and OpenAI) and simplifies the API for end users.

The key changes include:
- Removing AgentLifecycleHooks and all on_* hook methods
- Introducing EventType enum with comprehensive event types
- Using AgentResult as the standard return type for all events
- Updating protocol handlers to transform AgentResult into protocol-specific formats
- Simplifying the server model and request/response handling

Additionally, the LangChain/LangGraph integration now uses the new to_agui_events function which supports multiple streaming formats including astream_events, stream, and astream with different modes.

Existing convert functions are preserved for backward compatibility but aliased to to_agui_events.

This change provides a more consistent and extensible foundation for agent event handling.

refactor(server): 从生命周期钩子迁移到标准化的 AgentResult 事件

此更改将以前基于钩子的系统替换为使用 AgentResult 的标准化事件驱动架构。新系统为所有协议(AG-UI 和 OpenAI)提供一致的事件处理,并简化了最终用户的 API。

主要变更包括:
- 移除 AgentLifecycleHooks 和所有 on_* 钩子方法
- 引入包含全面事件类型的 EventType 枚举
- 使用 AgentResult 作为所有事件的标准返回类型
- 更新协议处理器以将 AgentResult 转换为协议特定格式
- 简化服务器模型和请求/响应处理

此外,LangChain/LangGraph 集成现在使用新的 to_agui_events 函数,该函数支持多种流式传输格式,包括 astream_events、stream 和不同模式的 astream。

为了向后兼容保留了现有的 convert 函数,但别名为 to_agui_events。

此更改提供了更一致和可扩展的代理事件处理基础。

Change-Id: Ie9f3aad829e03a7f8437cc605317876edee4ae49
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…ring for tool inputs

adds _safe_json_dumps function to handle non-serializable objects gracefully and _filter_tool_input to remove internal runtime fields from tool inputs, improving compatibility with LangGraph/MCP integrations

the changes ensure that tool inputs containing non-JSON serializable objects or internal runtime fields from LangGraph/MCP are properly handled, preventing serialization errors and removing unwanted internal data from user-facing outputs

test: add comprehensive tests for tool input filtering and error handling

adds tests for non-JSON serializable objects, internal field filtering, and unsupported stream formats to ensure robust handling of various input scenarios

Change-Id: I9fe119448419b2efda300bf225db96b0b9c2a1aa
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…stency

Add _extract_tool_call_id function to handle MCP tool runtime objects and
prioritize original tool_call_id over run_id for consistent event tracking.
Update converter to use extracted ID or fallback to run_id when not available.
Add comprehensive tests for tool start/end events with runtime tool_call_id.

fixes inconsistent tool call ID tracking when using MCP tools that inject
runtime objects with original LLM-generated tool_call_id

为 runtime 对象添加提取原始 tool_call_id 的功能以确保一致性

添加 _extract_tool_call_id 函数来处理 MCP 工具 runtime 对象,并优先使用原始
tool_call_id 而不是 run_id 来确保事件跟踪的一致性。更新转换器以使用提取的 ID,
当不可用时回退到 run_id。为带有 runtime tool_call_id 的工具开始/结束事件添加
全面的测试。

修复使用注入包含原始 LLM 生成 tool_call_id 的 runtime 对象的 MCP 工具时
工具调用 ID 跟踪不一致的问题

Change-Id: I2838c7b88ea8c01c87b39038d2b92a06bea89167
Signed-off-by: OhYee <oyohyee@oyohyee.com>
… tool_call_id handling

adds new AgentRunConverter class to maintain tool_call_id consistency in streaming tool calls,
where the first chunk provides the id and subsequent chunks only have index. also updates
integration modules to use the new converter class and maintains backward compatibility.

The new AgentRunConverter class manages the mapping between tool call indices and IDs to ensure
consistent tool_call_id across streaming chunks, which is crucial for proper tool call
correlation in the AG-UI protocol.

// 中文翻译:

添加了新的 AgentRunConverter 类来维护流式工具调用中 tool_call_id 的一致性,
其中第一个数据块提供 ID,后续的数据块只有索引。同时更新了集成模块以使用新
的转换器类并保持向后兼容性。

新的 AgentRunConverter 类管理工具调用索引和 ID 之间的映射,确保跨流式数据块的
tool_call_id 一致性,这对于 AG-UI 协议中的正确工具调用关联至关重要。

Change-Id: I35ce9ac6db8a05d0b052df77aa716bf4402ced64
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…ent handling

adds AguiEventNormalizer to ensure proper event ordering and consistency
for tool call events. introduces tool_call_started_set to prevent duplicate
TOOL_CALL_START events and ensures correct AG-UI protocol sequence:
TOOL_CALL_START → TOOL_CALL_ARGS → TOOL_CALL_END → TOOL_CALL_RESULT.
also removes deprecated test file and updates test coverage.

fixes issue where streaming tool calls could send duplicate START events
and ensures proper event sequencing for AG-UI protocol compliance.

test(agui): update test coverage for new event normalization logic

feat(agui): 引入AG-UI事件规范化器并增强工具调用事件处理

添加AguiEventNormalizer以确保工具调用事件的正确排序和一致性。
引入tool_call_started_set来防止重复的TOOL_CALL_START事件,
并确保正确的AG-UI协议序列:TOOL_CALL_START → TOOL_CALL_ARGS → TOOL_CALL_END → TOOL_CALL_RESULT。
同时删除了废弃的测试文件并更新了测试覆盖。

修复了流式工具调用可能发送重复START事件的问题,
并确保AG-UI协议一致性。

test(agui): 为新的事件规范化逻辑更新测试覆盖

test(agui): 为新的事件规范化逻辑更新测试覆盖

Change-Id: I9c73b3ba467825be47f4433a65ec03ab012a77cc
Signed-off-by: OhYee <oyohyee@oyohyee.com>
reorder the import statements in the langgraph agent converter module
to maintain consistent ordering and improve code readability.

将 langgraph agent converter 模块中的导入语句重新排序
以保持一致的顺序并提高代码可读性。

Change-Id: Ie398c0393e4efd9f19dec40a76cdb96bdaa95ed5
Signed-off-by: OhYee <oyohyee@oyohyee.com>
update test assertions to accommodate dynamic id and created fields in responses
and improve streaming response validation with more robust content checking

更新测试断言以处理响应中的动态字段
并改进流式响应验证,使用更健壮的内容检查

Change-Id: I5380588f3522d5917d17dfab199506ab01ae392c
Signed-off-by: OhYee <oyohyee@oyohyee.com>
- Replace custom AG-UI event type mapping with ag-ui-protocol package
- Introduce event encoder and update event formatting logic
- Add comprehensive test coverage for AG-UI and OpenAI protocol streaming
- Update dependencies to include ag-ui-protocol>=0.1.10

This change standardizes the AG-UI protocol implementation by utilizing
the official ag-ui-protocol package for event types and encoding,
improving maintainability and compatibility.

==

此变更通过使用官方的 ag-ui-protocol 包来标准化 AG-UI 协议实现,
更新了依赖项以包含 ag-ui-protocol>=0.1.10,并增加了全面的测试覆盖率。

此变更通过使用 ag-ui-protocol 包进行事件类型和编码来标准化
AG-UI 协议的实现,从而提高了可维护性和兼容性。

Change-Id: Ie645ad1402fff279b0188d514fc37ac4cde5710e
Signed-off-by: OhYee <oyohyee@oyohyee.com>
… agnostic design

This change migrates the core event handling system from AgentResult to AgentEvent,
providing a protocol-agnostic foundation that supports both OpenAI and AG-UI protocols.
The new AgentEvent system automatically handles boundary events (START/END) at the
protocol layer, simplifying user-facing event definitions and improving consistency
across different protocol implementations.

The migration includes:
- Replacing AgentResult with AgentEvent as the primary event type
- Introducing simplified event types (TEXT, TOOL_CALL, TOOL_RESULT)
- Removing manual boundary event handling from user code
- Adding automatic boundary event generation in protocol handlers
- Updating all integrations and tests to use the new event system

BREAKING CHANGE: AgentResult has been replaced with AgentEvent. Users must update
their event handling code to use the new event types and structure.

将服务器从 AgentResult 迁移到 AgentEvent,实现协议无关的设计

此次更改将核心事件处理系统从 AgentResult 迁移到 AgentEvent,
提供了支持 OpenAI 和 AG-UI 协议的协议无关基础。
新的 AgentEvent 系统在协议层自动处理边界事件(START/END),
简化了面向用户的事件定义并提高不同协议实现之间的一致性。

迁移包括:
- 将 AgentResult 替换为 AgentEvent 作为主要事件类型
- 引入简化的事件类型(TEXT、TOOL_CALL、TOOL_RESULT)
- 从用户代码中移除手动边界事件处理
- 在协议处理器中添加自动边界事件生成
- 更新所有集成和测试以使用新的事件系统

重大变更:AgentResult 已被 AgentEvent 替换。用户必须更新
他们的事件处理代码以使用新的事件类型和结构。

perf(langgraph): optimize tool call event conversion with chunk-based streaming

Improves the tool call event conversion process in LangGraph integration by
switching from multiple discrete events (START, ARGS, END) to a single
chunk-based approach (TOOL_CALL_CHUNK). This reduces event overhead and
simplifies the event flow while maintaining proper tool call ID consistency
and argument streaming capabilities.

The optimization includes:
- Consolidating tool call events into single chunks with complete args
- Maintaining proper tool call ID tracking across streaming chunks
- Supporting both complete and incremental argument transmission
- Preserving compatibility with existing LangGraph event formats

性能优化(langgraph): 使用基于块的流优化工具调用事件转换

通过从多个离散事件(START、ARGS、END)切换到单一块方法(TOOL_CALL_CHUNK),
改进 LangGraph 集成中的工具调用事件转换过程。
这减少了事件开销并简化了事件流,同时保持适当的工具调用 ID 一致性和
参数流功能。

优化包括:
- 将工具调用事件整合到包含完整参数的单个块中
- 在流块间保持适当的工具调用 ID 跟踪
- 支持完整和增量参数传输
- 保持与现有 LangGraph 事件格式的兼容性

test: add comprehensive test coverage for event conversion and protocol handling

Adds extensive test coverage for the new AgentEvent system and event conversion
functionality. Includes unit tests for LangGraph event conversion, protocol
handling, and edge cases. The test suite ensures proper behavior across
different event formats and verifies correct event ordering and ID consistency.

The new tests cover:
- LangGraph event conversion for different stream modes
- AG-UI event normalizer functionality
- Server protocol handling for both OpenAI and AG-UI
- Tool call ID consistency across streaming chunks
- Error handling and edge cases

测试: 为事件转换和协议处理添加全面的测试覆盖

为新的 AgentEvent 系统和事件转换功能添加广泛的测试覆盖。
包括 LangGraph 事件转换、协议处理和边缘情况的单元测试。
测试套件确保不同事件格式的正确行为,并验证正确的事件排序和 ID 一致性。

新测试覆盖:
- 不同流模式下的 LangGraph 事件转换
- AG-UI 事件规范化器功能
- 服务器协议处理(OpenAI 和 AG-UI)
- 流块间工具调用 ID 的一致性
- 错误处理和边缘情况

Change-Id: I92fca1758866344bd34486b853d177e7d8f9fdf4
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…ncing

Added comprehensive error event handling for LangGraph integrations including
on_tool_error, on_llm_error, on_chain_error, and on_retriever_error events.
Enhanced AG-UI protocol event sequencing with proper boundary management,
ensuring correct order of TEXT_MESSAGE and TOOL_CALL events. Implemented
RUN_ERROR handling that properly terminates event streams without sending
subsequent events. Updated AgentRequest to use raw_request object instead
of separate body/headers fields for better request access.

新增了 LangGraph 集成的全面错误事件处理,包括 on_tool_error、on_llm_error、
on_chain_error 和 on_retriever_error 事件。增强了 AG-UI 协议事件序列,
确保 TEXT_MESSAGE 和 TOOL_CALL 事件的正确顺序。实现了 RUN_ERROR 处理,
在发生错误时正确终止事件流,不再发送后续事件。更新了 AgentRequest
使用 raw_request 对象替代独立的 body/headers 字段以更好地访问请求。

BREAKING CHANGE: AgentRequest body and headers fields replaced with raw_request object

重大变更:AgentRequest 的 body 和 headers 字段被 raw_request 对象替代

Change-Id: Ibc612068239977c3d01a338ba8d34992b988e451
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…vent handling

The AgentRunConverter has been updated to use a class-based approach for better state management. The conversion functions are now static methods of the AgentRunConverter class, providing improved tool call ID consistency and event sequencing. The deprecated conversion functions have been removed from the public API.

The AGUI protocol handler has been enhanced with copilotkit compatibility mode to handle sequential tool calls and improved event ordering. New protocol configuration options have been added to support these features.

Several tests have been updated to reflect these changes, including new test files for comprehensive event handling coverage.

feat: add AGUIProtocolConfig with copilotkit compatibility option
refactor: migrate convert functions to AgentRunConverter static methods
refactor: enhance AGUI protocol handler with improved event sequencing
test: add comprehensive tests for AgentRunConverter and AGUI protocol

重构 langgraph 事件转换器以使用 AgentRunConverter 类并增强事件处理

AgentRunConverter 已更新为使用基于类的方法,以实现更好的状态管理。转换函数现在是 AgentRunConverter 类的静态方法,提供改进的工具调用 ID 一致性和事件排序。已从公共 API 中删除了已弃用的转换函数。

AGUI 协议处理器已通过 copilotkit 兼容模式增强,以处理顺序工具调用和改进的事件排序。已添加新的协议配置选项以支持这些功能。

已更新多个测试以反映这些更改,包括为全面事件处理覆盖范围添加的新测试文件。

新功能: 添加带有 copilotkit 兼容性选项的 AGUIProtocolConfig
重构: 将转换函数迁移到 AgentRunConverter 静态方法
重构: 使用改进的事件排序增强 AGUI 协议处理器
测试: 为 AgentRunConverter 和 AGUI 协议添加全面测试

Change-Id: I9e73626b3bf17d6d5af90038d0f2eb229f93dee6
Signed-off-by: OhYee <oyohyee@oyohyee.com>
refactor test_invoker.py to improve code readability by formatting
AgentRequest parameter initialization with proper line breaks and
indentation, and reformatting list comprehensions for better
readability.

改进了 AgentRequest 初始化和列表推导式的格式化,提高了代码可读性

Change-Id: I2ed9ef8cf8396105df3c17f9327379e7a2b2b925
Signed-off-by: OhYee <oyohyee@oyohyee.com>
…sive testing

Added support for streaming tool execution results through new TOOL_RESULT_CHUNK
event type and Human-in-the-Loop (HITL) functionality for user interaction
during agent execution. Implemented caching mechanism for streaming chunks and
proper boundary event handling in AGUI protocol. Added comprehensive integration
tests for LangChain with MCP protocol validation.

The changes include:
- New TOOL_RESULT_CHUNK event for streaming tool execution progress
- HITL event support for human intervention during tool calls
- Tool result chunk caching and assembly in protocol handler
- AGUI protocol enhancements for streaming and HITL scenarios
- OpenAI protocol skips unsupported streaming/HITL events
- Complete integration test suite with protocol validation

Change-Id: I8f59df3bcffd212283813beac09951a936e7fb4c
test: add comprehensive LangChain AGUI integration tests with protocol validation
Signed-off-by: OhYee <oyohyee@oyohyee.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@OhYee OhYee requested a review from Copilot December 17, 2025 08:30
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 34 out of 37 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 21 to 32
# 使用 conftest.py 中的公共函数
from tests.unittests.integration.conftest import convert_and_collect
from tests.unittests.integration.conftest import (
create_mock_ai_message as create_ai_message,
)
from tests.unittests.integration.conftest import (
create_mock_ai_message_chunk as create_ai_message_chunk,
)
from tests.unittests.integration.conftest import (
create_mock_tool_message as create_tool_message,
)
from tests.unittests.integration.conftest import filter_agent_events
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The imports reference conftest module, but based on the file listing, these helper functions are defined in helpers.py, not conftest.py. These imports will fail at runtime.

Suggested change
# 使用 conftest.py 中的公共函数
from tests.unittests.integration.conftest import convert_and_collect
from tests.unittests.integration.conftest import (
create_mock_ai_message as create_ai_message,
)
from tests.unittests.integration.conftest import (
create_mock_ai_message_chunk as create_ai_message_chunk,
)
from tests.unittests.integration.conftest import (
create_mock_tool_message as create_tool_message,
)
from tests.unittests.integration.conftest import filter_agent_events
# 使用 helpers.py 中的公共函数
from tests.unittests.integration.helpers import convert_and_collect
from tests.unittests.integration.helpers import (
create_mock_ai_message as create_ai_message,
)
from tests.unittests.integration.helpers import (
create_mock_ai_message_chunk as create_ai_message_chunk,
)
from tests.unittests.integration.helpers import (
create_mock_tool_message as create_tool_message,
)
from tests.unittests.integration.helpers import filter_agent_events

Copilot uses AI. Check for mistakes.
Comment on lines +22 to +27
# 使用 helpers.py 中的公共函数
from .helpers import convert_and_collect
from .helpers import create_mock_ai_message as create_ai_message
from .helpers import create_mock_ai_message_chunk as create_ai_message_chunk
from .helpers import create_mock_tool_message as create_tool_message
from .helpers import filter_agent_events
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file correctly imports from .helpers, while test_langgraph_to_agent_event.py incorrectly imports from conftest. The inconsistency suggests one file needs correction - the other file should follow this pattern.

Copilot uses AI. Check for mistakes.
…xible event merging

This change replaces the AdditionMode enum with MergeOptions for more flexible control over how addition fields are merged with event data. The StreamStateMachine class is introduced to handle streaming state management, improving tool call handling and UUID resolution in copilotkit compatibility mode. The protocol handlers are updated to use the new merge options instead of the rigid enum-based approach.

This provides better flexibility for merging additional fields while maintaining backward compatibility through the merge helper function.

feat: 使用 MergeOptions 替换 AdditionMode 枚举以实现灵活的事件合并

此变更使用 MergeOptions 替换 AdditionMode 枚举,以实现对附加字段如何与事件数据合并的更灵活控制。引入了 StreamStateMachine 类来处理流状态管理,改进了在 copilotkit 兼容模式下的工具调用处理和 UUID 解析。协议处理器已更新以使用新的合并选项,而不是基于枚举的刚性方法。

这为合并附加字段提供了更好的灵活性,同时通过合并助手函数保持向后兼容性。

Change-Id: Ic84a5a9b743e7bedbc75e5938248b34348f17591
Signed-off-by: OhYee <oyohyee@oyohyee.com>
@OhYee OhYee merged commit 64e9502 into main Dec 17, 2025
1 check passed
@OhYee OhYee deleted the agentrun-agui branch December 17, 2025 10:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants