Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 84 additions & 34 deletions agentops/instrumentation/agentic/agno/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,42 @@ def __getattr__(self, name):
return getattr(self.original_result, name)


class AsyncStreamingResultWrapper:
"""Wrapper for async streaming results that maintains agent span as active throughout iteration."""

def __init__(self, original_result, span, agent_id, agent_context, streaming_context_manager):
self.original_result = original_result
self.span = span
self.agent_id = agent_id
self.agent_context = agent_context
self.streaming_context_manager = streaming_context_manager
self._consumed = False

def __aiter__(self):
"""Return async iterator that keeps agent span active during iteration."""
return self

async def __anext__(self):
"""Async iteration that keeps agent span active."""
context_token = otel_context.attach(self.agent_context)
try:
item = await self.original_result.__anext__()
return item
except StopAsyncIteration:
# Clean up when iteration is complete
if not self._consumed:
self._consumed = True
self.span.end()
self.streaming_context_manager.remove_context(self.agent_id)
raise
finally:
otel_context.detach(context_token)

def __getattr__(self, name):
"""Delegate attribute access to the original result."""
return getattr(self.original_result, name)


def create_streaming_workflow_wrapper(tracer, streaming_context_manager):
"""Create a streaming-aware wrapper for workflow run methods."""

Expand Down Expand Up @@ -391,7 +427,9 @@ def wrapper(wrapped, instance, args, kwargs):
def create_streaming_agent_async_wrapper(tracer, streaming_context_manager):
"""Create a streaming-aware async wrapper for agent run methods."""

async def wrapper(wrapped, instance, args, kwargs):
def wrapper(wrapped, instance, args, kwargs):
import inspect

# Get agent ID for context storage
agent_id = getattr(instance, "agent_id", None) or getattr(instance, "id", None) or id(instance)
agent_id = str(agent_id)
Expand Down Expand Up @@ -427,7 +465,7 @@ async def wrapper(wrapped, instance, args, kwargs):
# Execute the original function within agent context
context_token = otel_context.attach(current_context)
try:
result = await wrapped(*args, **kwargs)
result = wrapped(*args, **kwargs)
finally:
otel_context.detach(context_token)

Expand All @@ -442,7 +480,11 @@ async def wrapper(wrapped, instance, args, kwargs):
span.set_status(Status(StatusCode.OK))

# Wrap the result to maintain context and end span when complete
if hasattr(result, "__iter__"):
if inspect.isasyncgen(result):
return AsyncStreamingResultWrapper(
result, span, agent_id, current_context, streaming_context_manager
)
elif hasattr(result, "__iter__"):
return StreamingResultWrapper(result, span, agent_id, current_context, streaming_context_manager)
else:
# Not actually streaming, clean up immediately
Expand All @@ -457,32 +499,35 @@ async def wrapper(wrapped, instance, args, kwargs):
streaming_context_manager.remove_context(agent_id)
raise
else:
# For non-streaming, use normal context manager
with tracer.start_as_current_span(span_name) as span:
try:
# Set agent attributes
attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs)
for key, value in attributes.items():
span.set_attribute(key, value)
# For non-streaming, need to handle async call
async def async_wrapper():
with tracer.start_as_current_span(span_name) as span:
try:
# Set agent attributes
attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs)
for key, value in attributes.items():
span.set_attribute(key, value)

# Execute the original function
result = await wrapped(*args, **kwargs)
# Execute the original function
result = await wrapped(*args, **kwargs)

# Set result attributes
result_attributes = get_agent_run_attributes(
args=(instance,) + args, kwargs=kwargs, return_value=result
)
for key, value in result_attributes.items():
if key not in attributes: # Avoid duplicates
span.set_attribute(key, value)
# Set result attributes
result_attributes = get_agent_run_attributes(
args=(instance,) + args, kwargs=kwargs, return_value=result
)
for key, value in result_attributes.items():
if key not in attributes: # Avoid duplicates
span.set_attribute(key, value)

span.set_status(Status(StatusCode.OK))
return result
span.set_status(Status(StatusCode.OK))
return result

except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise

return async_wrapper()

return wrapper

Expand Down Expand Up @@ -835,7 +880,9 @@ def wrapper(wrapped, instance, args, kwargs):
def create_team_async_wrapper(tracer, streaming_context_manager):
"""Create an async wrapper for Team methods that establishes the team context."""

async def wrapper(wrapped, instance, args, kwargs):
def wrapper(wrapped, instance, args, kwargs):
import inspect

# Get team ID for context storage
team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance)
team_id = str(team_id)
Expand Down Expand Up @@ -863,17 +910,20 @@ async def wrapper(wrapped, instance, args, kwargs):
# Execute the original function within team context
context_token = otel_context.attach(current_context)
try:
result = await wrapped(*args, **kwargs)

# For non-streaming, close the span
if not is_streaming:
span.end()
streaming_context_manager.remove_context(team_id)

return result
result = wrapped(*args, **kwargs)
finally:
otel_context.detach(context_token)

# For streaming, wrap the result to maintain context
if is_streaming and inspect.isasyncgen(result):
return AsyncStreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager)
elif hasattr(result, "__iter__"):
return StreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager)
else:
span.end()
streaming_context_manager.remove_context(team_id)
return result

except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
Expand Down
Loading