diff --git a/agentops/instrumentation/agentic/agno/instrumentor.py b/agentops/instrumentation/agentic/agno/instrumentor.py index 0707f5e54..b8d41cbfe 100644 --- a/agentops/instrumentation/agentic/agno/instrumentor.py +++ b/agentops/instrumentation/agentic/agno/instrumentor.py @@ -115,6 +115,42 @@ def __getattr__(self, name): return getattr(self.original_result, name) +class AsyncStreamingResultWrapper: + """Wrapper for async streaming results that maintains agent span as active throughout iteration.""" + + def __init__(self, original_result, span, agent_id, agent_context, streaming_context_manager): + self.original_result = original_result + self.span = span + self.agent_id = agent_id + self.agent_context = agent_context + self.streaming_context_manager = streaming_context_manager + self._consumed = False + + def __aiter__(self): + """Return async iterator that keeps agent span active during iteration.""" + return self + + async def __anext__(self): + """Async iteration that keeps agent span active.""" + context_token = otel_context.attach(self.agent_context) + try: + item = await self.original_result.__anext__() + return item + except StopAsyncIteration: + # Clean up when iteration is complete + if not self._consumed: + self._consumed = True + self.span.end() + self.streaming_context_manager.remove_context(self.agent_id) + raise + finally: + otel_context.detach(context_token) + + def __getattr__(self, name): + """Delegate attribute access to the original result.""" + return getattr(self.original_result, name) + + def create_streaming_workflow_wrapper(tracer, streaming_context_manager): """Create a streaming-aware wrapper for workflow run methods.""" @@ -391,7 +427,9 @@ def wrapper(wrapped, instance, args, kwargs): def create_streaming_agent_async_wrapper(tracer, streaming_context_manager): """Create a streaming-aware async wrapper for agent run methods.""" - async def wrapper(wrapped, instance, args, kwargs): + def wrapper(wrapped, instance, args, kwargs): + import inspect + # Get agent ID for context storage agent_id = getattr(instance, "agent_id", None) or getattr(instance, "id", None) or id(instance) agent_id = str(agent_id) @@ -427,7 +465,7 @@ async def wrapper(wrapped, instance, args, kwargs): # Execute the original function within agent context context_token = otel_context.attach(current_context) try: - result = await wrapped(*args, **kwargs) + result = wrapped(*args, **kwargs) finally: otel_context.detach(context_token) @@ -442,7 +480,11 @@ async def wrapper(wrapped, instance, args, kwargs): span.set_status(Status(StatusCode.OK)) # Wrap the result to maintain context and end span when complete - if hasattr(result, "__iter__"): + if inspect.isasyncgen(result): + return AsyncStreamingResultWrapper( + result, span, agent_id, current_context, streaming_context_manager + ) + elif hasattr(result, "__iter__"): return StreamingResultWrapper(result, span, agent_id, current_context, streaming_context_manager) else: # Not actually streaming, clean up immediately @@ -457,32 +499,35 @@ async def wrapper(wrapped, instance, args, kwargs): streaming_context_manager.remove_context(agent_id) raise else: - # For non-streaming, use normal context manager - with tracer.start_as_current_span(span_name) as span: - try: - # Set agent attributes - attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) - for key, value in attributes.items(): - span.set_attribute(key, value) + # For non-streaming, need to handle async call + async def async_wrapper(): + with tracer.start_as_current_span(span_name) as span: + try: + # Set agent attributes + attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) - # Execute the original function - result = await wrapped(*args, **kwargs) + # Execute the original function + result = await wrapped(*args, **kwargs) - # Set result attributes - result_attributes = get_agent_run_attributes( - args=(instance,) + args, kwargs=kwargs, return_value=result - ) - for key, value in result_attributes.items(): - if key not in attributes: # Avoid duplicates - span.set_attribute(key, value) + # Set result attributes + result_attributes = get_agent_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) - span.set_status(Status(StatusCode.OK)) - return result + span.set_status(Status(StatusCode.OK)) + return result - except Exception as e: - span.set_status(Status(StatusCode.ERROR, str(e))) - span.record_exception(e) - raise + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return async_wrapper() return wrapper @@ -835,7 +880,9 @@ def wrapper(wrapped, instance, args, kwargs): def create_team_async_wrapper(tracer, streaming_context_manager): """Create an async wrapper for Team methods that establishes the team context.""" - async def wrapper(wrapped, instance, args, kwargs): + def wrapper(wrapped, instance, args, kwargs): + import inspect + # Get team ID for context storage team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) team_id = str(team_id) @@ -863,17 +910,20 @@ async def wrapper(wrapped, instance, args, kwargs): # Execute the original function within team context context_token = otel_context.attach(current_context) try: - result = await wrapped(*args, **kwargs) - - # For non-streaming, close the span - if not is_streaming: - span.end() - streaming_context_manager.remove_context(team_id) - - return result + result = wrapped(*args, **kwargs) finally: otel_context.detach(context_token) + # For streaming, wrap the result to maintain context + if is_streaming and inspect.isasyncgen(result): + return AsyncStreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager) + elif hasattr(result, "__iter__"): + return StreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager) + else: + span.end() + streaming_context_manager.remove_context(team_id) + return result + except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e)