Skip to content

Commit fdcea19

Browse files
committed
feat(agui): add streaming tool output and HITL support with comprehensive testing
Added support for streaming tool execution results through new TOOL_RESULT_CHUNK event type and Human-in-the-Loop (HITL) functionality for user interaction during agent execution. Implemented caching mechanism for streaming chunks and proper boundary event handling in AGUI protocol. Added comprehensive integration tests for LangChain with MCP protocol validation. The changes include: - New TOOL_RESULT_CHUNK event for streaming tool execution progress - HITL event support for human intervention during tool calls - Tool result chunk caching and assembly in protocol handler - AGUI protocol enhancements for streaming and HITL scenarios - OpenAI protocol skips unsupported streaming/HITL events - Complete integration test suite with protocol validation Change-Id: I8f59df3bcffd212283813beac09951a936e7fb4c test: add comprehensive LangChain AGUI integration tests with protocol validation Signed-off-by: OhYee <oyohyee@oyohyee.com>
1 parent c05acf0 commit fdcea19

File tree

7 files changed

+851
-18
lines changed

7 files changed

+851
-18
lines changed

agentrun/server/__init__.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,45 @@
5858
...
5959
... yield f"当前时间: {result}"
6060
61+
Example (流式工具输出):
62+
>>> async def invoke_agent(request: AgentRequest):
63+
... # 发起工具调用
64+
... yield AgentEvent(
65+
... event=EventType.TOOL_CALL,
66+
... data={"id": "call_1", "name": "run_code", "args": '{"code": "..."}'}
67+
... )
68+
...
69+
... # 流式输出执行过程
70+
... yield AgentEvent(
71+
... event=EventType.TOOL_RESULT_CHUNK,
72+
... data={"id": "call_1", "delta": "Step 1: Compiling...\\n"}
73+
... )
74+
... yield AgentEvent(
75+
... event=EventType.TOOL_RESULT_CHUNK,
76+
... data={"id": "call_1", "delta": "Step 2: Running...\\n"}
77+
... )
78+
...
79+
... # 最终结果(标识流式输出结束)
80+
... yield AgentEvent(
81+
... event=EventType.TOOL_RESULT,
82+
... data={"id": "call_1", "result": "Execution completed."}
83+
... )
84+
85+
Example (HITL - 请求人类介入):
86+
>>> async def invoke_agent(request: AgentRequest):
87+
... # 请求用户确认
88+
... yield AgentEvent(
89+
... event=EventType.HITL,
90+
... data={
91+
... "id": "hitl_1",
92+
... "tool_call_id": "call_delete", # 可选
93+
... "type": "confirmation",
94+
... "prompt": "确认删除文件?",
95+
... "options": ["确认", "取消"]
96+
... }
97+
... )
98+
... # 用户响应将通过下一轮对话的 messages 传回
99+
61100
Example (访问原始请求):
62101
>>> async def invoke_agent(request: AgentRequest):
63102
... # 访问当前协议

agentrun/server/agui_protocol.py

Lines changed: 125 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,10 @@ async def _format_stream(
311311
"ended": False,
312312
"message_id": str(uuid.uuid4()),
313313
}
314-
# 工具调用状态:{tool_id: {"started": bool, "ended": bool, "name": str, "has_result": bool}}
314+
# 工具调用状态:{tool_id: {"started": bool, "ended": bool, "name": str, "has_result": bool, "is_hitl": bool}}
315315
tool_call_states: Dict[str, Dict[str, Any]] = {}
316+
# 工具结果流式输出缓存:{tool_id: [chunk1, chunk2, ...]}
317+
tool_result_chunks: Dict[str, List[str]] = {}
316318
# 错误状态:RUN_ERROR 后不能再发送任何事件
317319
run_errored = False
318320
# 当前活跃的工具调用 ID(仅在 copilotkit_compatibility=True 时使用)
@@ -354,6 +356,7 @@ def process_pending_queue() -> Iterator[str]:
354356
context,
355357
text_state,
356358
tool_call_states,
359+
tool_result_chunks,
357360
self._copilotkit_compatibility,
358361
):
359362
if sse_data:
@@ -426,6 +429,7 @@ def process_pending_queue() -> Iterator[str]:
426429
context,
427430
text_state,
428431
tool_call_states,
432+
tool_result_chunks,
429433
self._copilotkit_compatibility,
430434
):
431435
if sse_data:
@@ -454,6 +458,7 @@ def process_pending_queue() -> Iterator[str]:
454458
context,
455459
text_state,
456460
tool_call_states,
461+
tool_result_chunks,
457462
self._copilotkit_compatibility,
458463
):
459464
if sse_data:
@@ -489,7 +494,8 @@ def _process_event_with_boundaries(
489494
event: AgentEvent,
490495
context: Dict[str, Any],
491496
text_state: Dict[str, Any],
492-
tool_call_states: Dict[str, Dict[str, bool]],
497+
tool_call_states: Dict[str, Dict[str, Any]],
498+
tool_result_chunks: Dict[str, List[str]],
493499
copilotkit_compatibility: bool = False,
494500
) -> Iterator[str]:
495501
"""处理事件并注入边界事件
@@ -499,6 +505,7 @@ def _process_event_with_boundaries(
499505
context: 上下文
500506
text_state: 文本状态 {"started": bool, "ended": bool, "message_id": str}
501507
tool_call_states: 工具调用状态
508+
tool_result_chunks: 工具结果流式输出缓存
502509
copilotkit_compatibility: CopilotKit 兼容模式(启用工具调用串行化)
503510
504511
Yields:
@@ -660,6 +667,109 @@ def _process_event_with_boundaries(
660667
)
661668
return
662669

670+
# TOOL_RESULT_CHUNK 事件:工具执行过程中的流式输出
671+
# 缓存结果片段,直到收到 TOOL_RESULT 时拼接完整结果
672+
if event.event == EventType.TOOL_RESULT_CHUNK:
673+
tool_id = event.data.get("id", "")
674+
delta = event.data.get("delta", "")
675+
676+
# 缓存结果片段
677+
if tool_id:
678+
if tool_id not in tool_result_chunks:
679+
tool_result_chunks[tool_id] = []
680+
if delta:
681+
tool_result_chunks[tool_id].append(delta)
682+
return
683+
684+
# HITL 事件:请求人类介入
685+
# AG-UI HITL 标准:工具调用正常结束但不发送 RESULT
686+
# 前端会在用户交互后将结果作为 tool message 发送回来
687+
#
688+
# 两种使用方式:
689+
# 1. 关联已存在的工具调用:设置 tool_call_id
690+
# 2. 创建独立的 HITL 工具调用:只设置 id
691+
if event.event == EventType.HITL:
692+
hitl_id = event.data.get("id", "")
693+
tool_call_id = event.data.get("tool_call_id", "")
694+
hitl_type = event.data.get("type", "confirmation")
695+
prompt = event.data.get("prompt", "")
696+
options = event.data.get("options")
697+
default = event.data.get("default")
698+
timeout = event.data.get("timeout")
699+
schema = event.data.get("schema")
700+
701+
# 如果文本消息未结束,先结束文本消息
702+
if text_state["started"] and not text_state.get("ended", False):
703+
yield self._encoder.encode(
704+
TextMessageEndEvent(message_id=text_state["message_id"])
705+
)
706+
text_state["ended"] = True
707+
708+
# 情况 1:关联已存在的工具调用
709+
if tool_call_id and tool_call_id in tool_call_states:
710+
state = tool_call_states[tool_call_id]
711+
# 如果工具调用还未结束,先结束它
712+
if state["started"] and not state["ended"]:
713+
yield self._encoder.encode(
714+
ToolCallEndEvent(tool_call_id=tool_call_id)
715+
)
716+
state["ended"] = True
717+
# 标记为 HITL(不发送 RESULT)
718+
state["is_hitl"] = True
719+
state["has_result"] = False
720+
return
721+
722+
# 情况 2:创建独立的 HITL 工具调用
723+
# 构建工具调用参数
724+
import json as json_module
725+
726+
args_dict: Dict[str, Any] = {
727+
"type": hitl_type,
728+
"prompt": prompt,
729+
}
730+
if options:
731+
args_dict["options"] = options
732+
if default is not None:
733+
args_dict["default"] = default
734+
if timeout is not None:
735+
args_dict["timeout"] = timeout
736+
if schema:
737+
args_dict["schema"] = schema
738+
739+
args_json = json_module.dumps(args_dict, ensure_ascii=False)
740+
741+
# 使用 tool_call_id 如果提供了(但不在 states 中),否则使用 hitl_id
742+
actual_id = tool_call_id or hitl_id
743+
744+
# 发送 TOOL_CALL_START
745+
yield self._encoder.encode(
746+
ToolCallStartEvent(
747+
tool_call_id=actual_id,
748+
tool_call_name=f"hitl_{hitl_type}",
749+
)
750+
)
751+
752+
# 发送 TOOL_CALL_ARGS
753+
yield self._encoder.encode(
754+
ToolCallArgsEvent(
755+
tool_call_id=actual_id,
756+
delta=args_json,
757+
)
758+
)
759+
760+
# 发送 TOOL_CALL_END
761+
yield self._encoder.encode(ToolCallEndEvent(tool_call_id=actual_id))
762+
763+
# 标记为 HITL 工具调用(已结束,无 RESULT)
764+
tool_call_states[actual_id] = {
765+
"started": True,
766+
"ended": True,
767+
"name": f"hitl_{hitl_type}",
768+
"has_result": False, # HITL 不发送 RESULT
769+
"is_hitl": True,
770+
}
771+
return
772+
663773
# TOOL_RESULT 事件:确保当前工具调用已结束
664774
if event.event == EventType.TOOL_RESULT:
665775
tool_id = event.data.get("id", "")
@@ -730,15 +840,26 @@ def _process_event_with_boundaries(
730840
)
731841
tool_call_states[actual_tool_id]["ended"] = True
732842

843+
# 拼接缓存的流式输出片段和最终结果
844+
final_result = event.data.get("content") or event.data.get(
845+
"result", ""
846+
)
847+
if actual_tool_id and actual_tool_id in tool_result_chunks:
848+
# 将缓存的片段拼接到最终结果前面
849+
cached_chunks = "".join(tool_result_chunks[actual_tool_id])
850+
if cached_chunks:
851+
final_result = cached_chunks + final_result
852+
# 清理缓存
853+
del tool_result_chunks[actual_tool_id]
854+
733855
# 发送 TOOL_CALL_RESULT
734856
yield self._encoder.encode(
735857
ToolCallResultEvent(
736858
message_id=event.data.get(
737859
"message_id", f"tool-result-{actual_tool_id}"
738860
),
739861
tool_call_id=actual_tool_id,
740-
content=event.data.get("content")
741-
or event.data.get("result", ""),
862+
content=final_result,
742863
role="tool",
743864
)
744865
)

agentrun/server/model.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class AGUIProtocolConfig(ProtocolConfig):
3939
Attributes:
4040
prefix: 协议路由前缀,默认 "/ag-ui/agent"
4141
enable: 是否启用协议
42-
copilotkit_compatibility: CopilotKit 兼容模式。
42+
copilotkit_compatibility: 旧版本 CopilotKit 兼容模式。
4343
默认 False,遵循标准 AG-UI 协议,支持并行工具调用。
4444
设置为 True 时,启用以下兼容行为:
4545
- 在发送新的 TOOL_CALL_START 前自动结束其他活跃的工具调用
@@ -134,10 +134,16 @@ class EventType(str, Enum):
134134
TEXT = "TEXT" # 文本内容块
135135
TOOL_CALL = "TOOL_CALL" # 完整工具调用(含 id, name, args)
136136
TOOL_CALL_CHUNK = "TOOL_CALL_CHUNK" # 工具调用参数片段(流式场景)
137-
TOOL_RESULT = "TOOL_RESULT" # 工具执行结果
137+
TOOL_RESULT = "TOOL_RESULT" # 工具执行结果(最终结果,标识流式输出结束)
138+
TOOL_RESULT_CHUNK = "TOOL_RESULT_CHUNK" # 工具执行结果片段(流式输出场景)
138139
ERROR = "ERROR" # 错误事件
139140
STATE = "STATE" # 状态更新(快照或增量)
140141

142+
# =========================================================================
143+
# 人机交互事件
144+
# =========================================================================
145+
HITL = "HITL" # Human-in-the-Loop,请求人类介入
146+
141147
# =========================================================================
142148
# 扩展事件
143149
# =========================================================================
@@ -210,6 +216,65 @@ class AgentEvent(BaseModel):
210216
... data={"id": "tc-1", "result": "Sunny, 25°C"}
211217
... )
212218
219+
Example (流式工具执行结果):
220+
流式工具输出的使用流程:
221+
1. TOOL_RESULT_CHUNK 事件会被缓存,不会立即发送
222+
2. 必须发送 TOOL_RESULT 事件来标识流式输出结束
223+
3. TOOL_RESULT 会将缓存的 chunks 拼接到最终结果前面
224+
225+
>>> # 工具执行过程中流式输出(这些会被缓存)
226+
>>> yield AgentEvent(
227+
... event=EventType.TOOL_RESULT_CHUNK,
228+
... data={"id": "tc-1", "delta": "Executing step 1...\n"}
229+
... )
230+
>>> yield AgentEvent(
231+
... event=EventType.TOOL_RESULT_CHUNK,
232+
... data={"id": "tc-1", "delta": "Step 1 complete.\n"}
233+
... )
234+
>>> # 最终结果(必须发送,标识流式输出结束)
235+
>>> # 发送后会拼接为: "Executing step 1...\nStep 1 complete.\nAll steps completed."
236+
>>> yield AgentEvent(
237+
... event=EventType.TOOL_RESULT,
238+
... data={"id": "tc-1", "result": "All steps completed."}
239+
... )
240+
>>> # 如果只有流式输出,result 可以为空字符串
241+
>>> yield AgentEvent(
242+
... event=EventType.TOOL_RESULT,
243+
... data={"id": "tc-1", "result": ""} # 只使用缓存的 chunks
244+
... )
245+
246+
Example (HITL - Human-in-the-Loop,请求人类介入):
247+
HITL 有两种使用方式:
248+
1. 关联已存在的工具调用:设置 tool_call_id,复用现有工具
249+
2. 创建独立的 HITL 工具调用:只设置 id
250+
251+
>>> # 方式 1:关联已存在的工具调用(先发送 TOOL_CALL,再发送 HITL)
252+
>>> yield AgentEvent(
253+
... event=EventType.TOOL_CALL,
254+
... data={"id": "tc-delete", "name": "delete_file", "args": '{"file": "a.txt"}'}
255+
... )
256+
>>> yield AgentEvent(
257+
... event=EventType.HITL,
258+
... data={
259+
... "id": "hitl-1",
260+
... "tool_call_id": "tc-delete", # 关联已存在的工具调用
261+
... "type": "confirmation",
262+
... "prompt": "确认删除文件 a.txt?"
263+
... }
264+
... )
265+
>>> # 方式 2:创建独立的 HITL 工具调用
266+
>>> yield AgentEvent(
267+
... event=EventType.HITL,
268+
... data={
269+
... "id": "hitl-2",
270+
... "type": "input",
271+
... "prompt": "请输入密码:",
272+
... "options": ["确认", "取消"], # 可选
273+
... "schema": {"type": "string", "minLength": 8} # 可选
274+
... }
275+
... )
276+
>>> # 用户响应将通过下一轮对话的 messages 中的 tool message 传回
277+
213278
Example (自定义事件):
214279
>>> yield AgentEvent(
215280
... event=EventType.CUSTOM,

agentrun/server/openai_protocol.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,14 @@ async def _format_stream(
388388
if event.event == EventType.TOOL_RESULT:
389389
continue
390390

391+
# TOOL_RESULT_CHUNK 事件:OpenAI 协议不支持流式工具输出
392+
if event.event == EventType.TOOL_RESULT_CHUNK:
393+
continue
394+
395+
# HITL 事件:OpenAI 协议不支持
396+
if event.event == EventType.HITL:
397+
continue
398+
391399
# 其他事件忽略
392400
# (ERROR, STATE, CUSTOM 等不直接映射到 OpenAI 格式)
393401

examples/quick_start.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@
1111
from langchain.agents import create_agent
1212
import pydash
1313

14-
from agentrun.integration.langchain import (
15-
model,
16-
sandbox_toolset,
17-
to_agui_events,
18-
)
14+
from agentrun.integration.langchain import model, sandbox_toolset
15+
from agentrun.integration.langgraph.agent_converter import AgentRunConverter
1916
from agentrun.sandbox import TemplateType
2017
from agentrun.server import AgentRequest, AgentRunServer
18+
from agentrun.server.model import ServerConfig
2119
from agentrun.utils.log import logger
2220

2321
# 请替换为您已经创建的 模型 和 沙箱 名称
@@ -66,15 +64,12 @@ async def invoke_agent(request: AgentRequest):
6664
]
6765
}
6866

67+
converter = AgentRunConverter()
6968
if request.stream:
7069

7170
async def async_generator():
72-
# to_agui_events 函数支持多种调用方式:
73-
# - agent.astream_events(input, version="v2") - 支持 token by token
74-
# - agent.astream(input, stream_mode="updates") - 按节点输出
75-
# - agent.stream(input, stream_mode="updates") - 同步版本
7671
async for event in agent.astream(input, stream_mode="updates"):
77-
for item in to_agui_events(event):
72+
for item in converter.convert(event):
7873
yield item
7974

8075
return async_generator()
@@ -83,4 +78,11 @@ async def async_generator():
8378
return pydash.get(result, "messages[-1].content", "")
8479

8580

86-
AgentRunServer(invoke_agent=invoke_agent).start()
81+
AgentRunServer(
82+
invoke_agent=invoke_agent,
83+
config=ServerConfig(
84+
cors_origins=[
85+
"*"
86+
] # 部署在 AgentRun 上时,AgentRun 已经自动为你处理了跨域问题,可以省略这一行
87+
),
88+
).start()

0 commit comments

Comments
 (0)