Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
consts.SESSION_ID_KEY,
consts.SESSION_DESCRIPTION_KEY,
consts.HIRING_MANAGER_ID_KEY,
consts.GEN_AI_CALLER_CLIENT_IP_KEY, # gen_ai.caller.client.ip
# Execution context
consts.GEN_AI_EXECUTION_SOURCE_NAME_KEY, # gen_ai.channel.name
consts.GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY, # gen_ai.channel.link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
from microsoft_agents_a365.observability.core.constants import (
CUSTOM_PARENT_SPAN_ID_KEY,
EXECUTE_TOOL_OPERATION_NAME,
GEN_AI_EXECUTION_TYPE_KEY,
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OPERATION_NAME_KEY,
GEN_AI_OUTPUT_MESSAGES_KEY,
GEN_AI_REQUEST_MODEL_KEY,
GEN_AI_SYSTEM_KEY,
GEN_AI_TOOL_CALL_ID_KEY,
GEN_AI_TOOL_TYPE_KEY,
INVOKE_AGENT_OPERATION_NAME,
)
from microsoft_agents_a365.observability.core.execution_type import ExecutionType
from microsoft_agents_a365.observability.core.utils import as_utc_nano, safe_json_dumps
from opentelemetry import trace as ot_trace
from opentelemetry.context import attach, detach
Expand All @@ -44,10 +48,13 @@
)

from .constants import (
GEN_AI_GRAPH_NODE_ID,
GEN_AI_GRAPH_NODE_PARENT_ID,
)
from .utils import (
capture_input_message,
capture_output_message,
capture_tool_call_ids,
find_ancestor_agent_span_id,
get_attributes_from_function_span_data,
get_attributes_from_generation_span_data,
get_attributes_from_input,
Expand All @@ -56,6 +63,7 @@
get_span_kind,
get_span_name,
get_span_status,
get_tool_call_id,
)

logger = logging.getLogger(__name__)
Expand All @@ -68,6 +76,7 @@

class OpenAIAgentsTraceProcessor(TracingProcessor):
_MAX_HANDOFFS_IN_FLIGHT = 1000
_MAX_PENDING_TOOL_CALLS = 1000

def __init__(self, tracer: Tracer) -> None:
self._tracer = tracer
Expand All @@ -79,6 +88,17 @@ def __init__(self, tracer: Tracer) -> None:
# Use an OrderedDict and _MAX_HANDOFFS_IN_FLIGHT to cap the size of the dict
# in case there are large numbers of orphaned handoffs
self._reverse_handoffs_dict: OrderedDict[str, str] = OrderedDict()
# Track input/output messages for agent spans (keyed by agent span_id)
self._agent_inputs: dict[str, str] = {}
self._agent_outputs: dict[str, str] = {}
# Track agent span IDs to find nearest ancestor
self._agent_span_ids: set[str] = set()
# Track parent-child relationships: child_span_id -> parent_span_id
self._span_parents: dict[str, str] = {}
# Track tool_call_ids from GenerationSpan: (function_name, trace_id) -> call_id
# Use an OrderedDict and _MAX_PENDING_TOOL_CALLS to cap the size of the dict
# in case tool calls are captured but never consumed
self._pending_tool_calls: OrderedDict[str, str] = OrderedDict()

# helper
def _stamp_custom_parent(self, otel_span: OtelSpan, trace_id: str) -> None:
Expand Down Expand Up @@ -133,6 +153,12 @@ def on_span_start(self, span: Span[Any]) -> None:
)
self._otel_spans[span.span_id] = otel_span
self._tokens[span.span_id] = attach(set_span_in_context(otel_span))
# Track parent-child relationship for ancestor lookup
if span.parent_id:
self._span_parents[span.span_id] = span.parent_id
# Track AgentSpan IDs
if isinstance(span.span_data, AgentSpanData):
self._agent_span_ids.add(span.span_id)

def on_span_end(self, span: Span[Any]) -> None:
"""Called when a span is finished. Should not block or raise exceptions.
Expand All @@ -142,6 +168,8 @@ def on_span_end(self, span: Span[Any]) -> None:
"""
if token := self._tokens.pop(span.span_id, None):
detach(token) # type: ignore[arg-type]
# Clean up parent tracking
self._span_parents.pop(span.span_id, None)
if not (otel_span := self._otel_spans.pop(span.span_id, None)):
return
otel_span.update_name(get_span_name(span))
Expand All @@ -167,14 +195,32 @@ def on_span_end(self, span: Span[Any]) -> None:
for k, v in get_attributes_from_generation_span_data(data):
otel_span.set_attribute(k, v)
self._stamp_custom_parent(otel_span, span.trace_id)
# Capture input/output messages for nearest ancestor agent span
if agent_span_id := find_ancestor_agent_span_id(
span.parent_id, self._agent_span_ids, self._span_parents
):
if data.input:
capture_input_message(agent_span_id, data.input, self._agent_inputs)
if data.output:
capture_output_message(agent_span_id, data.output, self._agent_outputs)
# Capture tool_call_ids for later use by FunctionSpan
if data.output:
capture_tool_call_ids(
data.output, self._pending_tool_calls, self._MAX_PENDING_TOOL_CALLS
)
otel_span.update_name(
f"{otel_span.attributes[GEN_AI_OPERATION_NAME_KEY]} {otel_span.attributes[GEN_AI_REQUEST_MODEL_KEY]}"
)
elif isinstance(data, FunctionSpanData):
for k, v in get_attributes_from_function_span_data(data):
otel_span.set_attribute(k, v)
self._stamp_custom_parent(otel_span, span.trace_id)
otel_span.update_name(f"{EXECUTE_TOOL_OPERATION_NAME} {data.function_name}")
otel_span.set_attribute(GEN_AI_TOOL_TYPE_KEY, data.type)
# Set tool_call_id if available from preceding GenerationSpan
func_args = data.input if data.input else ""
if tool_call_id := get_tool_call_id(data.name, func_args, self._pending_tool_calls):
otel_span.set_attribute(GEN_AI_TOOL_CALL_ID_KEY, tool_call_id)
otel_span.update_name(f"{EXECUTE_TOOL_OPERATION_NAME} {data.name}")
elif isinstance(data, MCPListToolsSpanData):
for k, v in get_attributes_from_mcp_list_tool_span_data(data):
otel_span.set_attribute(k, v)
Expand All @@ -187,12 +233,19 @@ def on_span_end(self, span: Span[Any]) -> None:
while len(self._reverse_handoffs_dict) > self._MAX_HANDOFFS_IN_FLIGHT:
self._reverse_handoffs_dict.popitem(last=False)
elif isinstance(data, AgentSpanData):
otel_span.set_attribute(GEN_AI_GRAPH_NODE_ID, data.name)
otel_span.set_attribute(GEN_AI_EXECUTION_TYPE_KEY, ExecutionType.HUMAN_TO_AGENT.value)
# Lookup the parent node if exists
key = f"{data.name}:{span.trace_id}"
if parent_node := self._reverse_handoffs_dict.pop(key, None):
otel_span.set_attribute(GEN_AI_GRAPH_NODE_PARENT_ID, parent_node)
# Apply captured input/output messages from child spans
if input_msg := self._agent_inputs.pop(span.span_id, None):
otel_span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, input_msg)
if output_msg := self._agent_outputs.pop(span.span_id, None):
otel_span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, output_msg)
otel_span.update_name(f"{INVOKE_AGENT_OPERATION_NAME} {get_span_name(span)}")
# Clean up tracking
self._agent_span_ids.discard(span.span_id)

end_time: int | None = None
if span.ended_at:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from microsoft_agents_a365.observability.core.constants import (
GEN_AI_CHOICE,
GEN_AI_EVENT_CONTENT,
GEN_AI_EXECUTION_PAYLOAD_KEY,
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OUTPUT_MESSAGES_KEY,
Expand Down Expand Up @@ -364,9 +365,9 @@ def get_attributes_from_function_span_data(
) -> Iterator[tuple[str, AttributeValue]]:
yield GEN_AI_TOOL_NAME_KEY, obj.name
if obj.input:
yield GEN_AI_INPUT_MESSAGES_KEY, obj.input
yield GEN_AI_TOOL_ARGS_KEY, obj.input
if obj.output is not None:
yield GEN_AI_OUTPUT_MESSAGES_KEY, _convert_to_primitive(obj.output)
yield GEN_AI_EVENT_CONTENT, _convert_to_primitive(obj.output)


def get_attributes_from_message_content_list(
Expand Down Expand Up @@ -534,3 +535,100 @@ def get_span_status(obj: Span[Any]) -> Status:
)
else:
return Status(StatusCode.OK)


def capture_tool_call_ids(
output_list: Any, pending_tool_calls: dict[str, str], max_size: int = 1000
) -> None:
"""Extract and store tool_call_ids from generation output for later use by FunctionSpan.

Args:
output_list: The generation output containing tool calls
pending_tool_calls: OrderedDict to store pending tool calls
max_size: Maximum number of pending tool calls to keep in memory
"""
if not output_list:
return
try:
for msg in output_list:
if isinstance(msg, dict) and msg.get("role") == "assistant":
tool_calls = msg.get("tool_calls")
if tool_calls:
for tc in tool_calls:
if isinstance(tc, dict):
call_id = tc.get("id")
func = tc.get("function", {})
func_name = func.get("name") if isinstance(func, dict) else None
func_args = func.get("arguments", "") if isinstance(func, dict) else ""
if call_id and func_name:
# Key by (function_name, arguments) to uniquely identify each call
key = f"{func_name}:{func_args}"
pending_tool_calls[key] = call_id
# Cap the size of the dict to prevent unbounded growth
while len(pending_tool_calls) > max_size:
pending_tool_calls.popitem(last=False)
except Exception:
pass


def get_tool_call_id(
function_name: str, function_args: str, pending_tool_calls: dict[str, str]
) -> str | None:
"""Get and remove the tool_call_id for a function with specific arguments."""
key = f"{function_name}:{function_args}"
return pending_tool_calls.pop(key, None)


def capture_input_message(
parent_span_id: str, input_list: Any, agent_inputs: dict[str, str]
) -> None:
"""Extract and store the first user message from input list for parent agent span."""
if parent_span_id in agent_inputs:
return # Already captured
if not input_list:
return
try:
for msg in input_list:
if isinstance(msg, dict) and msg.get("role") == "user":
content = msg.get("content", "")
if content:
agent_inputs[parent_span_id] = str(content)
return
except Exception:
pass


def capture_output_message(
parent_span_id: str, output_list: Any, agent_outputs: dict[str, str]
) -> None:
"""Extract and store the last assistant message with actual content (no tool calls) for parent agent span."""
if not output_list:
return
try:
# Iterate in reverse to get the last assistant message with content (not a tool call)
output_items = list(output_list) if not isinstance(output_list, list) else output_list
for msg in reversed(output_items):
if isinstance(msg, dict) and msg.get("role") == "assistant":
content = msg.get("content")
tool_calls = msg.get("tool_calls")
# Only capture if there's actual content and no tool_calls
# (tool_calls means this is an intermediate step, not the final response)
if content and not tool_calls:
agent_outputs[parent_span_id] = str(content)
return
except Exception:
pass


def find_ancestor_agent_span_id(
span_id: str | None, agent_span_ids: set[str], span_parents: dict[str, str]
) -> str | None:
"""Walk up the parent chain to find the nearest ancestor AgentSpan."""
current = span_id
visited: set[str] = set() # Prevent infinite loops
while current and current not in visited:
if current in agent_span_ids:
return current
visited.add(current)
current = span_parents.get(current)
return None
Loading
Loading