Skip to content

Commit 998621e

Browse files
authored
Merge pull request #66 from pattern-tech/fix/streaming
fix: updating streaming part
2 parents 4fd0fe4 + b06a5bf commit 998621e

1 file changed

Lines changed: 41 additions & 4 deletions

File tree

src/agent/services/agent_service.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ def __init__(self, sub_agents, memory=None, streaming: bool = True):
7272
self.streaming_handler = None
7373

7474
# Default timeout values that can be adjusted if needed
75-
self.token_timeout = 0.01
76-
self.buffer_timeout = 0.005
77-
self.poll_interval = 0.01
75+
self.token_timeout = 0.5 # Increased from 0.01
76+
self.buffer_timeout = 0.1 # Increased from 0.005
77+
self.poll_interval = 0.1 # Increased from 0.01
7878

7979
# Set up the streaming callback if streaming is enabled.
8080
if streaming:
@@ -200,6 +200,22 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
200200
# No new tokens available, wait a bit before checking again
201201
await asyncio.sleep(self.poll_interval)
202202
continue
203+
except asyncio.CancelledError:
204+
# Handle task cancellation gracefully
205+
error_event = {
206+
"type": "info",
207+
"data": "Stream was cancelled"
208+
}
209+
yield json.dumps(error_event) + "\n"
210+
break
211+
except ConnectionError as e:
212+
# Handle connection errors specifically
213+
error_event = {
214+
"type": "error",
215+
"data": f"Connection error: {str(e)}"
216+
}
217+
yield json.dumps(error_event) + "\n"
218+
break
203219
except Exception as e:
204220
# Handle any parsing or processing errors
205221
error_event = {
@@ -223,9 +239,30 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
223239
}
224240
yield json.dumps(error_event) + "\n"
225241

226-
# Wait for the task to complete and get the result
242+
# Send a completion event to signal the end of streaming
227243
try:
244+
completion_event = {
245+
"type": "completion",
246+
"data": "Stream completed"
247+
}
248+
yield json.dumps(completion_event) + "\n"
249+
250+
# Wait for the task to complete and get the result
228251
await task
252+
except asyncio.CancelledError:
253+
# Handle task cancellation gracefully
254+
error_event = {
255+
"type": "info",
256+
"data": "Task was cancelled"
257+
}
258+
yield json.dumps(error_event) + "\n"
259+
except ConnectionError as e:
260+
# Handle connection errors specifically
261+
error_event = {
262+
"type": "error",
263+
"data": f"Connection error: {str(e)}"
264+
}
265+
yield json.dumps(error_event) + "\n"
229266
except Exception as e:
230267
# Handle any errors during task execution
231268
error_event = {

0 commit comments

Comments
 (0)