From da82a4496cf0ba932ea4760b9e37c96d5f31971e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 17:39:44 +0000 Subject: [PATCH 1/2] Fix async streaming support for Agno Team.arun() Fixes #1262 The issue was that Team.arun() with stream=True returns an async generator directly, not a coroutine. The team async wrapper was defined as 'async def' which caused it to wrap the async generator in a coroutine, leading to the error: 'async for' requires an object with __aiter__ method, got coroutine. Changes: - Added AsyncStreamingResultWrapper class to properly wrap async generators - Modified create_team_async_wrapper to be a regular function (not async) - Added inspect.isasyncgen() check to detect async generators - Updated create_streaming_agent_async_wrapper to check for __aiter__ first The fix ensures that async generators are returned directly without being wrapped in a coroutine, allowing proper async iteration with 'async for'. Co-Authored-By: Alex --- .../agentic/agno/instrumentor.py | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/agentops/instrumentation/agentic/agno/instrumentor.py b/agentops/instrumentation/agentic/agno/instrumentor.py index 0707f5e54..0913c2554 100644 --- a/agentops/instrumentation/agentic/agno/instrumentor.py +++ b/agentops/instrumentation/agentic/agno/instrumentor.py @@ -115,6 +115,42 @@ def __getattr__(self, name): return getattr(self.original_result, name) +class AsyncStreamingResultWrapper: + """Wrapper for async streaming results that maintains agent span as active throughout iteration.""" + + def __init__(self, original_result, span, agent_id, agent_context, streaming_context_manager): + self.original_result = original_result + self.span = span + self.agent_id = agent_id + self.agent_context = agent_context + self.streaming_context_manager = streaming_context_manager + self._consumed = False + + def __aiter__(self): + """Return async iterator that keeps agent span active during iteration.""" + return self + + async def __anext__(self): + """Async iteration that keeps agent span active.""" + context_token = otel_context.attach(self.agent_context) + try: + item = await self.original_result.__anext__() + return item + except StopAsyncIteration: + # Clean up when iteration is complete + if not self._consumed: + self._consumed = True + self.span.end() + self.streaming_context_manager.remove_context(self.agent_id) + raise + finally: + otel_context.detach(context_token) + + def __getattr__(self, name): + """Delegate attribute access to the original result.""" + return getattr(self.original_result, name) + + def create_streaming_workflow_wrapper(tracer, streaming_context_manager): """Create a streaming-aware wrapper for workflow run methods.""" @@ -442,7 +478,11 @@ async def wrapper(wrapped, instance, args, kwargs): span.set_status(Status(StatusCode.OK)) # Wrap the result to maintain context and end span when complete - if hasattr(result, "__iter__"): + if hasattr(result, "__aiter__"): + return AsyncStreamingResultWrapper( + result, span, agent_id, current_context, streaming_context_manager + ) + elif hasattr(result, "__iter__"): return StreamingResultWrapper(result, span, agent_id, current_context, streaming_context_manager) else: # Not actually streaming, clean up immediately @@ -835,7 +875,9 @@ def wrapper(wrapped, instance, args, kwargs): def create_team_async_wrapper(tracer, streaming_context_manager): """Create an async wrapper for Team methods that establishes the team context.""" - async def wrapper(wrapped, instance, args, kwargs): + def wrapper(wrapped, instance, args, kwargs): + import inspect + # Get team ID for context storage team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) team_id = str(team_id) @@ -863,17 +905,20 @@ async def wrapper(wrapped, instance, args, kwargs): # Execute the original function within team context context_token = otel_context.attach(current_context) try: - result = await wrapped(*args, **kwargs) - - # For non-streaming, close the span - if not is_streaming: - span.end() - streaming_context_manager.remove_context(team_id) - - return result + result = wrapped(*args, **kwargs) finally: otel_context.detach(context_token) + # For streaming, wrap the result to maintain context + if is_streaming and inspect.isasyncgen(result): + return AsyncStreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager) + elif hasattr(result, "__iter__"): + return StreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager) + else: + span.end() + streaming_context_manager.remove_context(team_id) + return result + except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) From fd37f5eb3aff144aeef81238e0d2b32531838080 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 17:54:49 +0000 Subject: [PATCH 2/2] Fix Agent async streaming support Also fixes Agent.arun() async streaming in addition to Team.arun(). Changes: - Changed create_streaming_agent_async_wrapper from async def to def - Added inspect.isasyncgen() check for detecting async generators - For non-streaming case, created inner async_wrapper function - Both Team and Agent async streaming now work correctly Co-Authored-By: Alex --- .../agentic/agno/instrumentor.py | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/agentops/instrumentation/agentic/agno/instrumentor.py b/agentops/instrumentation/agentic/agno/instrumentor.py index 0913c2554..b8d41cbfe 100644 --- a/agentops/instrumentation/agentic/agno/instrumentor.py +++ b/agentops/instrumentation/agentic/agno/instrumentor.py @@ -427,7 +427,9 @@ def wrapper(wrapped, instance, args, kwargs): def create_streaming_agent_async_wrapper(tracer, streaming_context_manager): """Create a streaming-aware async wrapper for agent run methods.""" - async def wrapper(wrapped, instance, args, kwargs): + def wrapper(wrapped, instance, args, kwargs): + import inspect + # Get agent ID for context storage agent_id = getattr(instance, "agent_id", None) or getattr(instance, "id", None) or id(instance) agent_id = str(agent_id) @@ -463,7 +465,7 @@ async def wrapper(wrapped, instance, args, kwargs): # Execute the original function within agent context context_token = otel_context.attach(current_context) try: - result = await wrapped(*args, **kwargs) + result = wrapped(*args, **kwargs) finally: otel_context.detach(context_token) @@ -478,7 +480,7 @@ async def wrapper(wrapped, instance, args, kwargs): span.set_status(Status(StatusCode.OK)) # Wrap the result to maintain context and end span when complete - if hasattr(result, "__aiter__"): + if inspect.isasyncgen(result): return AsyncStreamingResultWrapper( result, span, agent_id, current_context, streaming_context_manager ) @@ -497,32 +499,35 @@ async def wrapper(wrapped, instance, args, kwargs): streaming_context_manager.remove_context(agent_id) raise else: - # For non-streaming, use normal context manager - with tracer.start_as_current_span(span_name) as span: - try: - # Set agent attributes - attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) - for key, value in attributes.items(): - span.set_attribute(key, value) + # For non-streaming, need to handle async call + async def async_wrapper(): + with tracer.start_as_current_span(span_name) as span: + try: + # Set agent attributes + attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) - # Execute the original function - result = await wrapped(*args, **kwargs) + # Execute the original function + result = await wrapped(*args, **kwargs) - # Set result attributes - result_attributes = get_agent_run_attributes( - args=(instance,) + args, kwargs=kwargs, return_value=result - ) - for key, value in result_attributes.items(): - if key not in attributes: # Avoid duplicates - span.set_attribute(key, value) + # Set result attributes + result_attributes = get_agent_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) - span.set_status(Status(StatusCode.OK)) - return result + span.set_status(Status(StatusCode.OK)) + return result - except Exception as e: - span.set_status(Status(StatusCode.ERROR, str(e))) - span.record_exception(e) - raise + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return async_wrapper() return wrapper