11import json
22import asyncio
3- from typing import Dict , Any , Optional , AsyncGenerator
3+ from typing import Dict , Any , AsyncGenerator
44
5+ from datetime import datetime
56from langchain .agents import AgentExecutor
67from langchain .callbacks .base import BaseCallbackHandler
78from langchain_core .runnables .history import RunnableWithMessageHistory
@@ -16,6 +17,7 @@ class StreamingCallbackHandler(BaseCallbackHandler):
1617 A callback handler that collects tokens and intermediate events in an asyncio queue.
1718 Uses a newline-delimited JSON (NDJSON) protocol for reliable streaming.
1819 Each event is a complete JSON object with a newline terminator.
20+ Captures detailed information about tool execution including inputs and outputs.
1921 """
2022
2123 def __init__ (self ):
@@ -42,31 +44,103 @@ def on_agent_action(self, action, **kwargs) -> None:
4244 action: The action being performed by the agent.
4345 **kwargs: Additional keyword arguments.
4446 """
47+ event = {
48+ "type" : "agent_start" ,
49+ "timestamp" : str (datetime .now ())
50+ }
51+ self .queue .put_nowait (json .dumps (event ) + "\n " )
52+
53+ def on_agent_finish (self , action , ** kwargs ) -> None :
54+ """
55+ Handle agent finish events.
56+
57+ Args:
58+ action: The action being performed by the agent.
59+ **kwargs: Additional keyword arguments.
60+ """
61+ event = {
62+ "type" : "agent_finish" ,
63+ "timestamp" : str (datetime .now ())
64+ }
65+ self .queue .put_nowait (json .dumps (event ) + "\n " )
66+
67+ def on_tool_start (self , serialized , input_str , ** kwargs ) -> None :
68+ """
69+ Handle tool start events.
70+
71+ Args:
72+ serialized: The serialized input to the tool.
73+ input_str: The string representation of the input.
74+ **kwargs: Additional keyword arguments.
75+ """
4576 event = {
4677 "type" : "tool_start" ,
47- "tool" : getattr (action , "tool" , None ),
48- "tool_input" : getattr (action , "tool_input" , {})
78+ "tool_name" : serialized ["name" ],
79+ "params" : input_str ,
80+ "timestamp" : str (datetime .now ())
81+ }
82+ # Use NDJSON format
83+ self .queue .put_nowait (json .dumps (event ) + "\n " )
84+
85+ def on_tool_end (self , output , ** kwargs ) -> None :
86+ """
87+ Handle tool completion events.
88+
89+ Args:
90+ output: The output produced by the tool.
91+ **kwargs: Additional keyword arguments.
92+ """
93+ # Extract information about the completed tool
94+ observation = kwargs .get ("observation" , output )
95+ tool_name = kwargs .get ("name" , None )
96+
97+ # Create a detailed event for tool completion
98+ event = {
99+ "type" : "tool_end" ,
100+ "tool_name" : tool_name ,
101+ "output" : observation ,
102+ "timestamp" : str (datetime .now ())
49103 }
50104 # Use NDJSON format
51105 self .queue .put_nowait (json .dumps (event ) + "\n " )
52106
107+ def on_tool_error (self , error , ** kwargs ) -> None :
108+ """
109+ Handle tool error events.
53110
54- class RouterAgentService :
111+ Args:
112+ error: The error that occurred during tool execution.
113+ **kwargs: Additional keyword arguments.
114+ """
115+ # Extract information about the tool that caused the error
116+ tool_name = kwargs .get ("name" , None )
117+
118+ # Create a detailed event for tool error
119+ event = {
120+ "type" : "tool_error" ,
121+ "tool_name" : tool_name ,
122+ "error" : str (error ),
123+ "timestamp" : str (datetime .now ())
124+ }
125+ # Use NDJSON format
126+ self .queue .put_nowait (json .dumps (event ) + "\n " )
127+
128+
129+ class AgentService :
55130 """
56- RouterAgentService is responsible for routing the input message to the appropriate agent
57- and returning the response.
131+ AgentService is responsible for doing the job
58132 """
59133
60- def __init__ (self , sub_agents , memory = None , streaming : bool = True ):
134+ def __init__ (self , tools , memory = None , streaming : bool = True ):
61135 """
62- Initialize the RouterAgentService .
136+ Initialize the AgentService .
63137
64138 Args:
65- sub_agents : The sub-agents to use for routing .
139+ tools : The tools to use for agent .
66140 memory: The memory to use for storing conversation history.
67141 streaming (bool): Whether to enable streaming responses.
68142 """
69- self .sub_agents = sub_agents
143+ self .tools = tools
70144 self .memory = memory
71145 self .streaming = streaming
72146 self .streaming_handler = None
@@ -88,22 +162,32 @@ def __init__(self, sub_agents, memory=None, streaming: bool = True):
88162 stream = streaming ,
89163 callbacks = [self .streaming_handler ] if self .streaming else None )
90164
91- self .prompt = init_prompt (self .llm , AgentType .ROUTER_AGENT )
165+ self .prompt = init_prompt (self .llm , AgentType .PATTERN_CORE_AGENT )
92166
93- self .agent = init_agent (self .llm , self .sub_agents , self .prompt )
167+ self .agent = init_agent (self .llm , self .tools , self .prompt )
94168
95169 if streaming :
170+ # Wrap each tool with the callback handler to ensure tool events are captured
171+ wrapped_tools = []
172+ for tool in self .tools :
173+ # Create a copy of the tool with callbacks attached
174+ tool_with_callbacks = tool .copy ()
175+ tool_with_callbacks .callbacks = [self .streaming_handler ]
176+ wrapped_tools .append (tool_with_callbacks )
177+
178+ # Make sure the streaming handler is registered for all events, including tool completion
96179 self .agent_executor = AgentExecutor (
97180 agent = self .agent ,
98- tools = self . sub_agents ,
181+ tools = wrapped_tools , # Use the wrapped tools with callbacks
99182 return_intermediate_steps = True ,
100183 verbose = True ,
101- callbacks = [self .streaming_handler ]
184+ callbacks = [self .streaming_handler ],
185+ handle_tool_error = True # Ensure tool errors are also captured
102186 )
103187 else :
104188 self .agent_executor = AgentExecutor (
105189 agent = self .agent ,
106- tools = self .sub_agents ,
190+ tools = self .tools ,
107191 return_intermediate_steps = True ,
108192 verbose = True
109193 )
@@ -263,38 +347,6 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
263347 }
264348 yield json .dumps (error_event ) + "\n "
265349
266- # Send a completion event to signal the end of streaming
267- try :
268- completion_event = {
269- "type" : "completion" ,
270- "data" : "Stream completed"
271- }
272- yield json .dumps (completion_event ) + "\n "
273-
274- # Wait for the task to complete and get the result
275- await task
276- except asyncio .CancelledError :
277- # Handle task cancellation gracefully
278- error_event = {
279- "type" : "info" ,
280- "data" : "Task was cancelled"
281- }
282- yield json .dumps (error_event ) + "\n "
283- except ConnectionError as e :
284- # Handle connection errors specifically
285- error_event = {
286- "type" : "error" ,
287- "data" : f"Connection error: { str (e )} "
288- }
289- yield json .dumps (error_event ) + "\n "
290- except Exception as e :
291- # Handle any errors during task execution
292- error_event = {
293- "type" : "error" ,
294- "data" : f"Task execution error: { str (e )} "
295- }
296- yield json .dumps (error_event ) + "\n "
297-
298350 def ask (self , message : str ) -> Dict [str , Any ]:
299351 """
300352 Sends a message to the agent and returns the response.
0 commit comments