@@ -14,13 +14,16 @@ class StreamingCallbackHandler(BaseCallbackHandler):
1414 """
1515 A callback handler that collects tokens and intermediate events in an asyncio queue.
1616 Uses a newline-delimited JSON protocol.
17+ Ensures each event is a complete JSON object with a newline terminator.
1718 """
1819
1920 def __init__ (self ):
2021 self .queue = asyncio .Queue ()
2122
2223 def on_llm_new_token (self , token : str , ** kwargs ) -> None :
24+ # Create a complete JSON event for each token
2325 event = {"type" : "token" , "data" : token }
26+ # Ensure each event ends with a newline for proper parsing
2427 self .queue .put_nowait (json .dumps (event ) + "\n " )
2528
2629 def on_agent_action (self , action , ** kwargs ) -> None :
@@ -29,6 +32,7 @@ def on_agent_action(self, action, **kwargs) -> None:
2932 "tool" : getattr (action , "tool" , None ),
3033 "tool_input" : getattr (action , "tool_input" , {})
3134 }
35+ # Ensure each event ends with a newline for proper parsing
3236 self .queue .put_nowait (json .dumps (event ) + "\n " )
3337
3438
@@ -119,34 +123,54 @@ async def stream(self, message: str):
119123 self .agent_executor .arun ({"input" : message })
120124 )
121125
122- # Buffer to collect tokens
123- buffer = ""
126+ # Use a smaller timeout to ensure more responsive streaming
127+ timeout = 0.01
124128
125129 # Yield tokens as they become available.
126130 while not task .done () or not self .streaming_handler .queue .empty ():
127131 try :
128- token = await asyncio .wait_for (self .streaming_handler .queue .get (), timeout = 0.1 )
129- # Add token to buffer
130- buffer += token
131-
132- # Check if buffer contains complete JSON objects (ending with newline)
133- while "\n " in buffer :
134- # Split at the first newline
135- json_str , buffer = buffer .split ("\n " , 1 )
136- # Only yield complete JSON objects
137- if json_str :
138- yield json_str + "\n "
139-
132+ # Get token with a short timeout to maintain streaming responsiveness
133+ token = await asyncio .wait_for (self .streaming_handler .queue .get (), timeout = timeout )
134+
135+ # Ensure token is a complete JSON object
136+ if token .endswith ("\n " ):
137+ # Token is already a complete JSON object, yield it directly
138+ yield token
139+ else :
140+ # Token might be incomplete, wait a tiny bit for more data
141+ buffer = token
142+ try :
143+ # Try to get more data with a very short timeout
144+ while not buffer .endswith ("\n " ):
145+ more_token = await asyncio .wait_for (
146+ self .streaming_handler .queue .get (),
147+ timeout = 0.005
148+ )
149+ buffer += more_token
150+ # If we now have a complete line, break
151+ if "\n " in buffer :
152+ break
153+ except asyncio .TimeoutError :
154+ # If we timeout waiting for more data, that's okay
155+ # We'll just yield what we have if it's complete
156+ pass
157+
158+ # Process the buffer to yield complete JSON objects
159+ while "\n " in buffer :
160+ json_str , remaining = buffer .split ("\n " , 1 )
161+ if json_str : # Only yield non-empty strings
162+ yield json_str + "\n "
163+ buffer = remaining
164+
165+ # If there's anything left in the buffer, keep it for next iteration
166+ if buffer :
167+ # Put it back in the queue for the next iteration
168+ self .streaming_handler .queue .put_nowait (buffer )
140169 except asyncio .TimeoutError :
170+ # Short timeout to keep the loop responsive
171+ await asyncio .sleep (0.01 )
141172 continue
142173
143- # Yield any remaining complete JSON in the buffer
144- if buffer and "\n " in buffer :
145- parts = buffer .split ("\n " )
146- for i in range (len (parts ) - 1 ):
147- if parts [i ]:
148- yield parts [i ] + "\n "
149-
150174 result = await task
151175
152176 def ask (self , message : str ):
0 commit comments