@@ -98,6 +98,7 @@ async def stream(self, message: str):
9898 - If memory is enabled, the agent's response is invoked synchronously using `run_in_executor`.
9999 - If memory is not enabled, the agent's response is invoked asynchronously using `arun`.
100100 - The method clears any leftover tokens in the queue before starting to stream the response.
101+ - Uses a buffer to ensure complete JSON objects are sent to prevent parsing errors.
101102 """
102103 # Clear any leftover tokens.
103104 while not self .streaming_handler .queue .empty ():
@@ -118,14 +119,34 @@ async def stream(self, message: str):
118119 self .agent_executor .arun ({"input" : message })
119120 )
120121
122+ # Buffer to collect tokens
123+ buffer = ""
124+
121125 # Yield tokens as they become available.
122126 while not task .done () or not self .streaming_handler .queue .empty ():
123127 try :
124128 token = await asyncio .wait_for (self .streaming_handler .queue .get (), timeout = 0.1 )
125- yield token
129+ # Add token to buffer
130+ buffer += token
131+
132+ # Check if buffer contains complete JSON objects (ending with newline)
133+ while "\n " in buffer :
134+ # Split at the first newline
135+ json_str , buffer = buffer .split ("\n " , 1 )
136+ # Only yield complete JSON objects
137+ if json_str :
138+ yield json_str + "\n "
139+
126140 except asyncio .TimeoutError :
127141 continue
128142
143+ # Yield any remaining complete JSON in the buffer
144+ if buffer and "\n " in buffer :
145+ parts = buffer .split ("\n " )
146+ for i in range (len (parts ) - 1 ):
147+ if parts [i ]:
148+ yield parts [i ] + "\n "
149+
129150 result = await task
130151
131152 def ask (self , message : str ):
0 commit comments