diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/trace_processor/util.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/trace_processor/util.py index 0abfd7d3..bfb75a56 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/trace_processor/util.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/trace_processor/util.py @@ -23,6 +23,7 @@ consts.SESSION_ID_KEY, consts.SESSION_DESCRIPTION_KEY, consts.HIRING_MANAGER_ID_KEY, + consts.GEN_AI_CALLER_CLIENT_IP_KEY, # gen_ai.caller.client.ip # Execution context consts.GEN_AI_EXECUTION_SOURCE_NAME_KEY, # gen_ai.channel.name consts.GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY, # gen_ai.channel.link diff --git a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py index 02720716..1459c95a 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py @@ -21,13 +21,17 @@ from microsoft_agents_a365.observability.core.constants import ( CUSTOM_PARENT_SPAN_ID_KEY, EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_EXECUTION_TYPE_KEY, GEN_AI_INPUT_MESSAGES_KEY, GEN_AI_OPERATION_NAME_KEY, GEN_AI_OUTPUT_MESSAGES_KEY, GEN_AI_REQUEST_MODEL_KEY, GEN_AI_SYSTEM_KEY, + GEN_AI_TOOL_CALL_ID_KEY, + GEN_AI_TOOL_TYPE_KEY, INVOKE_AGENT_OPERATION_NAME, ) +from microsoft_agents_a365.observability.core.execution_type import ExecutionType from microsoft_agents_a365.observability.core.utils import as_utc_nano, safe_json_dumps from opentelemetry import trace as ot_trace from opentelemetry.context import attach, detach @@ -44,10 +48,13 @@ ) from .constants import ( - GEN_AI_GRAPH_NODE_ID, GEN_AI_GRAPH_NODE_PARENT_ID, ) from .utils import ( + capture_input_message, + capture_output_message, + capture_tool_call_ids, + find_ancestor_agent_span_id, get_attributes_from_function_span_data, get_attributes_from_generation_span_data, get_attributes_from_input, @@ -56,6 +63,7 @@ get_span_kind, get_span_name, get_span_status, + get_tool_call_id, ) logger = logging.getLogger(__name__) @@ -68,6 +76,7 @@ class OpenAIAgentsTraceProcessor(TracingProcessor): _MAX_HANDOFFS_IN_FLIGHT = 1000 + _MAX_PENDING_TOOL_CALLS = 1000 def __init__(self, tracer: Tracer) -> None: self._tracer = tracer @@ -79,6 +88,17 @@ def __init__(self, tracer: Tracer) -> None: # Use an OrderedDict and _MAX_HANDOFFS_IN_FLIGHT to cap the size of the dict # in case there are large numbers of orphaned handoffs self._reverse_handoffs_dict: OrderedDict[str, str] = OrderedDict() + # Track input/output messages for agent spans (keyed by agent span_id) + self._agent_inputs: dict[str, str] = {} + self._agent_outputs: dict[str, str] = {} + # Track agent span IDs to find nearest ancestor + self._agent_span_ids: set[str] = set() + # Track parent-child relationships: child_span_id -> parent_span_id + self._span_parents: dict[str, str] = {} + # Track tool_call_ids from GenerationSpan: (function_name, trace_id) -> call_id + # Use an OrderedDict and _MAX_PENDING_TOOL_CALLS to cap the size of the dict + # in case tool calls are captured but never consumed + self._pending_tool_calls: OrderedDict[str, str] = OrderedDict() # helper def _stamp_custom_parent(self, otel_span: OtelSpan, trace_id: str) -> None: @@ -133,6 +153,12 @@ def on_span_start(self, span: Span[Any]) -> None: ) self._otel_spans[span.span_id] = otel_span self._tokens[span.span_id] = attach(set_span_in_context(otel_span)) + # Track parent-child relationship for ancestor lookup + if span.parent_id: + self._span_parents[span.span_id] = span.parent_id + # Track AgentSpan IDs + if isinstance(span.span_data, AgentSpanData): + self._agent_span_ids.add(span.span_id) def on_span_end(self, span: Span[Any]) -> None: """Called when a span is finished. Should not block or raise exceptions. @@ -142,6 +168,8 @@ def on_span_end(self, span: Span[Any]) -> None: """ if token := self._tokens.pop(span.span_id, None): detach(token) # type: ignore[arg-type] + # Clean up parent tracking + self._span_parents.pop(span.span_id, None) if not (otel_span := self._otel_spans.pop(span.span_id, None)): return otel_span.update_name(get_span_name(span)) @@ -167,6 +195,19 @@ def on_span_end(self, span: Span[Any]) -> None: for k, v in get_attributes_from_generation_span_data(data): otel_span.set_attribute(k, v) self._stamp_custom_parent(otel_span, span.trace_id) + # Capture input/output messages for nearest ancestor agent span + if agent_span_id := find_ancestor_agent_span_id( + span.parent_id, self._agent_span_ids, self._span_parents + ): + if data.input: + capture_input_message(agent_span_id, data.input, self._agent_inputs) + if data.output: + capture_output_message(agent_span_id, data.output, self._agent_outputs) + # Capture tool_call_ids for later use by FunctionSpan + if data.output: + capture_tool_call_ids( + data.output, self._pending_tool_calls, self._MAX_PENDING_TOOL_CALLS + ) otel_span.update_name( f"{otel_span.attributes[GEN_AI_OPERATION_NAME_KEY]} {otel_span.attributes[GEN_AI_REQUEST_MODEL_KEY]}" ) @@ -174,7 +215,12 @@ def on_span_end(self, span: Span[Any]) -> None: for k, v in get_attributes_from_function_span_data(data): otel_span.set_attribute(k, v) self._stamp_custom_parent(otel_span, span.trace_id) - otel_span.update_name(f"{EXECUTE_TOOL_OPERATION_NAME} {data.function_name}") + otel_span.set_attribute(GEN_AI_TOOL_TYPE_KEY, data.type) + # Set tool_call_id if available from preceding GenerationSpan + func_args = data.input if data.input else "" + if tool_call_id := get_tool_call_id(data.name, func_args, self._pending_tool_calls): + otel_span.set_attribute(GEN_AI_TOOL_CALL_ID_KEY, tool_call_id) + otel_span.update_name(f"{EXECUTE_TOOL_OPERATION_NAME} {data.name}") elif isinstance(data, MCPListToolsSpanData): for k, v in get_attributes_from_mcp_list_tool_span_data(data): otel_span.set_attribute(k, v) @@ -187,12 +233,19 @@ def on_span_end(self, span: Span[Any]) -> None: while len(self._reverse_handoffs_dict) > self._MAX_HANDOFFS_IN_FLIGHT: self._reverse_handoffs_dict.popitem(last=False) elif isinstance(data, AgentSpanData): - otel_span.set_attribute(GEN_AI_GRAPH_NODE_ID, data.name) + otel_span.set_attribute(GEN_AI_EXECUTION_TYPE_KEY, ExecutionType.HUMAN_TO_AGENT.value) # Lookup the parent node if exists key = f"{data.name}:{span.trace_id}" if parent_node := self._reverse_handoffs_dict.pop(key, None): otel_span.set_attribute(GEN_AI_GRAPH_NODE_PARENT_ID, parent_node) + # Apply captured input/output messages from child spans + if input_msg := self._agent_inputs.pop(span.span_id, None): + otel_span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, input_msg) + if output_msg := self._agent_outputs.pop(span.span_id, None): + otel_span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, output_msg) otel_span.update_name(f"{INVOKE_AGENT_OPERATION_NAME} {get_span_name(span)}") + # Clean up tracking + self._agent_span_ids.discard(span.span_id) end_time: int | None = None if span.ended_at: diff --git a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py index 91952f0c..f1904a85 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py @@ -22,6 +22,7 @@ ) from microsoft_agents_a365.observability.core.constants import ( GEN_AI_CHOICE, + GEN_AI_EVENT_CONTENT, GEN_AI_EXECUTION_PAYLOAD_KEY, GEN_AI_INPUT_MESSAGES_KEY, GEN_AI_OUTPUT_MESSAGES_KEY, @@ -364,9 +365,9 @@ def get_attributes_from_function_span_data( ) -> Iterator[tuple[str, AttributeValue]]: yield GEN_AI_TOOL_NAME_KEY, obj.name if obj.input: - yield GEN_AI_INPUT_MESSAGES_KEY, obj.input + yield GEN_AI_TOOL_ARGS_KEY, obj.input if obj.output is not None: - yield GEN_AI_OUTPUT_MESSAGES_KEY, _convert_to_primitive(obj.output) + yield GEN_AI_EVENT_CONTENT, _convert_to_primitive(obj.output) def get_attributes_from_message_content_list( @@ -534,3 +535,100 @@ def get_span_status(obj: Span[Any]) -> Status: ) else: return Status(StatusCode.OK) + + +def capture_tool_call_ids( + output_list: Any, pending_tool_calls: dict[str, str], max_size: int = 1000 +) -> None: + """Extract and store tool_call_ids from generation output for later use by FunctionSpan. + + Args: + output_list: The generation output containing tool calls + pending_tool_calls: OrderedDict to store pending tool calls + max_size: Maximum number of pending tool calls to keep in memory + """ + if not output_list: + return + try: + for msg in output_list: + if isinstance(msg, dict) and msg.get("role") == "assistant": + tool_calls = msg.get("tool_calls") + if tool_calls: + for tc in tool_calls: + if isinstance(tc, dict): + call_id = tc.get("id") + func = tc.get("function", {}) + func_name = func.get("name") if isinstance(func, dict) else None + func_args = func.get("arguments", "") if isinstance(func, dict) else "" + if call_id and func_name: + # Key by (function_name, arguments) to uniquely identify each call + key = f"{func_name}:{func_args}" + pending_tool_calls[key] = call_id + # Cap the size of the dict to prevent unbounded growth + while len(pending_tool_calls) > max_size: + pending_tool_calls.popitem(last=False) + except Exception: + pass + + +def get_tool_call_id( + function_name: str, function_args: str, pending_tool_calls: dict[str, str] +) -> str | None: + """Get and remove the tool_call_id for a function with specific arguments.""" + key = f"{function_name}:{function_args}" + return pending_tool_calls.pop(key, None) + + +def capture_input_message( + parent_span_id: str, input_list: Any, agent_inputs: dict[str, str] +) -> None: + """Extract and store the first user message from input list for parent agent span.""" + if parent_span_id in agent_inputs: + return # Already captured + if not input_list: + return + try: + for msg in input_list: + if isinstance(msg, dict) and msg.get("role") == "user": + content = msg.get("content", "") + if content: + agent_inputs[parent_span_id] = str(content) + return + except Exception: + pass + + +def capture_output_message( + parent_span_id: str, output_list: Any, agent_outputs: dict[str, str] +) -> None: + """Extract and store the last assistant message with actual content (no tool calls) for parent agent span.""" + if not output_list: + return + try: + # Iterate in reverse to get the last assistant message with content (not a tool call) + output_items = list(output_list) if not isinstance(output_list, list) else output_list + for msg in reversed(output_items): + if isinstance(msg, dict) and msg.get("role") == "assistant": + content = msg.get("content") + tool_calls = msg.get("tool_calls") + # Only capture if there's actual content and no tool_calls + # (tool_calls means this is an intermediate step, not the final response) + if content and not tool_calls: + agent_outputs[parent_span_id] = str(content) + return + except Exception: + pass + + +def find_ancestor_agent_span_id( + span_id: str | None, agent_span_ids: set[str], span_parents: dict[str, str] +) -> str | None: + """Walk up the parent chain to find the nearest ancestor AgentSpan.""" + current = span_id + visited: set[str] = set() # Prevent infinite loops + while current and current not in visited: + if current in agent_span_ids: + return current + visited.add(current) + current = span_parents.get(current) + return None diff --git a/tests/observability/extensions/openai/integration/test_openai_trace_processor.py b/tests/observability/extensions/openai/integration/test_openai_trace_processor.py index 7af0ef80..95598420 100644 --- a/tests/observability/extensions/openai/integration/test_openai_trace_processor.py +++ b/tests/observability/extensions/openai/integration/test_openai_trace_processor.py @@ -7,12 +7,17 @@ from microsoft_agents_a365.observability.core import configure, get_tracer_provider from microsoft_agents_a365.observability.core.constants import ( GEN_AI_AGENT_ID_KEY, + GEN_AI_AGENT_NAME_KEY, + GEN_AI_EXECUTION_TYPE_KEY, GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OPERATION_NAME_KEY, GEN_AI_OUTPUT_MESSAGES_KEY, GEN_AI_REQUEST_MODEL_KEY, GEN_AI_SYSTEM_KEY, + INVOKE_AGENT_OPERATION_NAME, TENANT_ID_KEY, ) +from microsoft_agents_a365.observability.core.middleware.baggage_builder import BaggageBuilder from microsoft_agents_a365.observability.extensions.openai.trace_instrumentor import ( OpenAIAgentsTraceInstrumentor, ) @@ -184,6 +189,122 @@ async def run_agent_with_tool(): # Clean up instrumentor.uninstrument() + def test_invoke_agent_span_required_attributes(self, azure_openai_config, agent365_config): + """Test that invoke_agent span has all required attributes per schema.""" + + # Configure observability + configure( + service_name="integration-test-invoke-agent", + service_namespace="agent365-tests", + logger_name="test-logger", + ) + + # Get the tracer provider and add our mock exporter + provider = get_tracer_provider() + provider.add_span_processor(self.mock_exporter) + + # Initialize the instrumentor + instrumentor = OpenAIAgentsTraceInstrumentor() + instrumentor.instrument() + + try: + # Create Azure OpenAI client + openai_client = AsyncAzureOpenAI( + api_key=azure_openai_config["api_key"], + api_version=azure_openai_config["api_version"], + azure_endpoint=azure_openai_config["endpoint"], + ) + + # Create agent + agent = Agent( + name="TestAgent", + instructions="You are a helpful assistant. Answer briefly.", + model=OpenAIChatCompletionsModel( + model=azure_openai_config["deployment"], openai_client=openai_client + ), + ) + + # Execute agent wrapped with BaggageBuilder to provide required attributes + import asyncio + + async def run_agent(): + with ( + BaggageBuilder() + .agent_id("test-agent-id") + .agent_name("TestAgent") + .agent_auid("test-agent-auid") + .agent_upn("test-agent@test.com") + .agent_blueprint_id("test-blueprint-id") + .tenant_id("test-tenant-id") + .caller_id("test-caller-id") + .caller_name("Test Caller") + .caller_upn("test-caller@test.com") + .caller_client_ip("127.0.0.1") + .conversation_id("test-conversation-id") + .channel_name("test-channel") + .correlation_id("test-correlation-id") + .build() + ): + result = await Runner.run(agent, "Say hello") + return result.final_output + + response = asyncio.run(run_agent()) + + # Give time for spans to be processed + time.sleep(1) + + # Find the invoke_agent span + invoke_agent_span = None + for span in self.captured_spans: + if span.name.startswith(INVOKE_AGENT_OPERATION_NAME): + invoke_agent_span = span + break + + assert invoke_agent_span is not None, "invoke_agent span not found" + attributes = dict(invoke_agent_span.attributes or {}) + + print(f"invoke_agent span attributes: {list(attributes.keys())}") + + # Validate REQUIRED attributes (must be present) + required_attributes = [ + GEN_AI_OPERATION_NAME_KEY, # "gen_ai.operation.name" - Set by SDK + GEN_AI_AGENT_ID_KEY, # "gen_ai.agent.id" + GEN_AI_AGENT_NAME_KEY, # "gen_ai.agent.name" + GEN_AI_EXECUTION_TYPE_KEY, # "gen_ai.execution.type" + GEN_AI_INPUT_MESSAGES_KEY, # "gen_ai.input.messages" + GEN_AI_OUTPUT_MESSAGES_KEY, # "gen_ai.output.messages" + ] + + missing_required = [] + for attr in required_attributes: + if attr not in attributes: + missing_required.append(attr) + else: + print(f"✓ Required attribute present: {attr} = {str(attributes[attr])[:50]}...") + + assert len(missing_required) == 0, f"Missing required attributes: {missing_required}" + + # Validate operation name value + assert attributes[GEN_AI_OPERATION_NAME_KEY] == INVOKE_AGENT_OPERATION_NAME, ( + f"Expected operation name '{INVOKE_AGENT_OPERATION_NAME}', " + f"got '{attributes[GEN_AI_OPERATION_NAME_KEY]}'" + ) + + # Validate agent name matches + assert attributes[GEN_AI_AGENT_NAME_KEY] == "TestAgent", ( + f"Expected agent name 'TestAgent', got '{attributes[GEN_AI_AGENT_NAME_KEY]}'" + ) + + # Validate input/output messages are non-empty + assert attributes[GEN_AI_INPUT_MESSAGES_KEY], "Input messages should not be empty" + assert attributes[GEN_AI_OUTPUT_MESSAGES_KEY], "Output messages should not be empty" + + print("✓ All required invoke_agent span attributes validated") + print(f"Agent response: {response}") + + finally: + instrumentor.uninstrument() + def _validate_span_attributes(self, agent365_config): """Validate that spans have the expected attributes.""" llm_spans_found = 0 diff --git a/tests/observability/extensions/openai/test_trace_processor_bounded.py b/tests/observability/extensions/openai/test_trace_processor_bounded.py new file mode 100644 index 00000000..69226222 --- /dev/null +++ b/tests/observability/extensions/openai/test_trace_processor_bounded.py @@ -0,0 +1,139 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Standalone test for bounded tool calls functionality. +This test can be run without installing the full package. +""" + +from collections import OrderedDict + + +def capture_tool_call_ids_test(output_list, pending_tool_calls, max_size=1000): + """Test version of capture_tool_call_ids function.""" + if not output_list: + return + try: + for msg in output_list: + if isinstance(msg, dict) and msg.get("role") == "assistant": + tool_calls = msg.get("tool_calls") + if tool_calls: + for tc in tool_calls: + if isinstance(tc, dict): + call_id = tc.get("id") + func = tc.get("function", {}) + func_name = func.get("name") if isinstance(func, dict) else None + func_args = func.get("arguments", "") if isinstance(func, dict) else "" + if call_id and func_name: + key = f"{func_name}:{func_args}" + pending_tool_calls[key] = call_id + # Cap the size of the dict to prevent unbounded growth + while len(pending_tool_calls) > max_size: + pending_tool_calls.popitem(last=False) + except Exception: + pass + + +def test_bounded_size(): + """Test that the bounded size logic works correctly.""" + pending_tool_calls = OrderedDict() + max_size = 10 + + print("Testing bounded size functionality...") + print(f"Max size: {max_size}") + + # Create tool calls that exceed max_size + for i in range(15): + output_list = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": f"call_{i}", + "function": { + "name": f"function_{i}", + "arguments": f'{{"arg": {i}}}', + }, + } + ], + } + ] + capture_tool_call_ids_test(output_list, pending_tool_calls, max_size) + print(f"After adding call {i}: size = {len(pending_tool_calls)}") + + # Verify the size does not exceed max_size + assert len(pending_tool_calls) <= max_size, ( + f"Size {len(pending_tool_calls)} exceeds max {max_size}" + ) + assert len(pending_tool_calls) == max_size, ( + f"Size {len(pending_tool_calls)} should equal max {max_size}" + ) + + print(f"\n✅ Final size: {len(pending_tool_calls)} (max: {max_size})") + + # Verify that the oldest entries were removed (FIFO behavior) + print("\nVerifying FIFO behavior...") + for i in range(5): + key = f'function_{i}:{{"arg": {i}}}' + assert key not in pending_tool_calls, f"Old entry {key} should have been removed" + print(f"✅ Entry {i} was correctly removed (oldest)") + + # The last 10 entries should still be present + for i in range(5, 15): + key = f'function_{i}:{{"arg": {i}}}' + assert key in pending_tool_calls, f"Recent entry {key} should be present" + assert pending_tool_calls[key] == f"call_{i}", f"Call ID mismatch for {key}" + print(f"✅ Entry {i} is present with correct call_id") + + print("\n✅ All tests passed! Bounded size logic works correctly.") + return True + + +def test_single_tool_call(): + """Test storing a single tool call.""" + pending_tool_calls = OrderedDict() + max_size = 10 + + print("\nTesting single tool call...") + output_list = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_123", + "function": { + "name": "add_numbers", + "arguments": '{"a": 5, "b": 10}', + }, + } + ], + } + ] + capture_tool_call_ids_test(output_list, pending_tool_calls, max_size) + + assert len(pending_tool_calls) == 1 + key = 'add_numbers:{"a": 5, "b": 10}' + assert key in pending_tool_calls + assert pending_tool_calls[key] == "call_123" + + print("✅ Single tool call test passed!") + return True + + +if __name__ == "__main__": + print("=" * 70) + print("Running Bounded Tool Calls Tests") + print("=" * 70) + + try: + test_bounded_size() + test_single_tool_call() + print("\n" + "=" * 70) + print("🎉 All tests passed successfully!") + print("=" * 70) + except AssertionError as e: + print(f"\n❌ Test failed: {e}") + exit(1) + except Exception as e: + print(f"\n❌ Unexpected error: {e}") + exit(1)