diff --git a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py index 2c4eea8e..1459c95a 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py @@ -76,6 +76,7 @@ class OpenAIAgentsTraceProcessor(TracingProcessor): _MAX_HANDOFFS_IN_FLIGHT = 1000 + _MAX_PENDING_TOOL_CALLS = 1000 def __init__(self, tracer: Tracer) -> None: self._tracer = tracer @@ -95,7 +96,9 @@ def __init__(self, tracer: Tracer) -> None: # Track parent-child relationships: child_span_id -> parent_span_id self._span_parents: dict[str, str] = {} # Track tool_call_ids from GenerationSpan: (function_name, trace_id) -> call_id - self._pending_tool_calls: dict[str, str] = {} + # Use an OrderedDict and _MAX_PENDING_TOOL_CALLS to cap the size of the dict + # in case tool calls are captured but never consumed + self._pending_tool_calls: OrderedDict[str, str] = OrderedDict() # helper def _stamp_custom_parent(self, otel_span: OtelSpan, trace_id: str) -> None: @@ -202,7 +205,9 @@ def on_span_end(self, span: Span[Any]) -> None: capture_output_message(agent_span_id, data.output, self._agent_outputs) # Capture tool_call_ids for later use by FunctionSpan if data.output: - capture_tool_call_ids(data.output, self._pending_tool_calls) + capture_tool_call_ids( + data.output, self._pending_tool_calls, self._MAX_PENDING_TOOL_CALLS + ) otel_span.update_name( f"{otel_span.attributes[GEN_AI_OPERATION_NAME_KEY]} {otel_span.attributes[GEN_AI_REQUEST_MODEL_KEY]}" ) diff --git a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py index 93be370a..f1904a85 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py @@ -537,8 +537,16 @@ def get_span_status(obj: Span[Any]) -> Status: return Status(StatusCode.OK) -def capture_tool_call_ids(output_list: Any, pending_tool_calls: dict[str, str]) -> None: - """Extract and store tool_call_ids from generation output for later use by FunctionSpan.""" +def capture_tool_call_ids( + output_list: Any, pending_tool_calls: dict[str, str], max_size: int = 1000 +) -> None: + """Extract and store tool_call_ids from generation output for later use by FunctionSpan. + + Args: + output_list: The generation output containing tool calls + pending_tool_calls: OrderedDict to store pending tool calls + max_size: Maximum number of pending tool calls to keep in memory + """ if not output_list: return try: @@ -556,6 +564,9 @@ def capture_tool_call_ids(output_list: Any, pending_tool_calls: dict[str, str]) # Key by (function_name, arguments) to uniquely identify each call key = f"{func_name}:{func_args}" pending_tool_calls[key] = call_id + # Cap the size of the dict to prevent unbounded growth + while len(pending_tool_calls) > max_size: + pending_tool_calls.popitem(last=False) except Exception: pass diff --git a/tests/observability/extensions/openai/test_trace_processor_bounded.py b/tests/observability/extensions/openai/test_trace_processor_bounded.py new file mode 100644 index 00000000..a2225ca6 --- /dev/null +++ b/tests/observability/extensions/openai/test_trace_processor_bounded.py @@ -0,0 +1,137 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Standalone test for bounded tool calls functionality. +This test can be run without installing the full package. +""" + +from collections import OrderedDict + + +def capture_tool_call_ids_test( + output_list, pending_tool_calls, max_size=1000 +): + """Test version of capture_tool_call_ids function.""" + if not output_list: + return + try: + for msg in output_list: + if isinstance(msg, dict) and msg.get("role") == "assistant": + tool_calls = msg.get("tool_calls") + if tool_calls: + for tc in tool_calls: + if isinstance(tc, dict): + call_id = tc.get("id") + func = tc.get("function", {}) + func_name = func.get("name") if isinstance(func, dict) else None + func_args = func.get("arguments", "") if isinstance(func, dict) else "" + if call_id and func_name: + key = f"{func_name}:{func_args}" + pending_tool_calls[key] = call_id + # Cap the size of the dict to prevent unbounded growth + while len(pending_tool_calls) > max_size: + pending_tool_calls.popitem(last=False) + except Exception: + pass + + +def test_bounded_size(): + """Test that the bounded size logic works correctly.""" + pending_tool_calls = OrderedDict() + max_size = 10 + + print("Testing bounded size functionality...") + print(f"Max size: {max_size}") + + # Create tool calls that exceed max_size + for i in range(15): + output_list = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": f"call_{i}", + "function": { + "name": f"function_{i}", + "arguments": f'{{"arg": {i}}}', + }, + } + ], + } + ] + capture_tool_call_ids_test(output_list, pending_tool_calls, max_size) + print(f"After adding call {i}: size = {len(pending_tool_calls)}") + + # Verify the size does not exceed max_size + assert len(pending_tool_calls) <= max_size, f"Size {len(pending_tool_calls)} exceeds max {max_size}" + assert len(pending_tool_calls) == max_size, f"Size {len(pending_tool_calls)} should equal max {max_size}" + + print(f"\n✅ Final size: {len(pending_tool_calls)} (max: {max_size})") + + # Verify that the oldest entries were removed (FIFO behavior) + print("\nVerifying FIFO behavior...") + for i in range(5): + key = f'function_{i}:{{"arg": {i}}}' + assert key not in pending_tool_calls, f"Old entry {key} should have been removed" + print(f"✅ Entry {i} was correctly removed (oldest)") + + # The last 10 entries should still be present + for i in range(5, 15): + key = f'function_{i}:{{"arg": {i}}}' + assert key in pending_tool_calls, f"Recent entry {key} should be present" + assert pending_tool_calls[key] == f"call_{i}", f"Call ID mismatch for {key}" + print(f"✅ Entry {i} is present with correct call_id") + + print("\n✅ All tests passed! Bounded size logic works correctly.") + return True + + +def test_single_tool_call(): + """Test storing a single tool call.""" + pending_tool_calls = OrderedDict() + max_size = 10 + + print("\nTesting single tool call...") + output_list = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_123", + "function": { + "name": "add_numbers", + "arguments": '{"a": 5, "b": 10}', + }, + } + ], + } + ] + capture_tool_call_ids_test(output_list, pending_tool_calls, max_size) + + assert len(pending_tool_calls) == 1 + key = 'add_numbers:{"a": 5, "b": 10}' + assert key in pending_tool_calls + assert pending_tool_calls[key] == "call_123" + + print("✅ Single tool call test passed!") + return True + + +if __name__ == "__main__": + print("=" * 70) + print("Running Bounded Tool Calls Tests") + print("=" * 70) + + try: + test_bounded_size() + test_single_tool_call() + print("\n" + "=" * 70) + print("🎉 All tests passed successfully!") + print("=" * 70) + except AssertionError as e: + print(f"\n❌ Test failed: {e}") + exit(1) + except Exception as e: + print(f"\n❌ Unexpected error: {e}") + exit(1)