diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index e3c6e73b..698c6c9f 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -27,7 +27,7 @@ hex_trace_id, kind_name, parse_retry_after, - partition_by_identity, + filter_and_partition_by_identity, status_name, truncate_span, ) @@ -80,10 +80,10 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return SpanExportResult.FAILURE try: - groups = partition_by_identity(spans) + groups = filter_and_partition_by_identity(spans) if not groups: - # No spans with identity; treat as success - logger.info("No spans with tenant/agent identity found; nothing exported.") + # No eligible genAI spans to export after filtering/partitioning; treat as success + logger.info("No eligible genAI spans to export; nothing exported.") return SpanExportResult.SUCCESS # Log number of groups and total span count diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py index 36906f0d..2d84b074 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py @@ -14,16 +14,35 @@ from opentelemetry.trace import SpanKind, StatusCode from ..constants import ( + CHAT_OPERATION_NAME, ENABLE_A365_OBSERVABILITY_EXPORTER, + EXECUTE_TOOL_OPERATION_NAME, GEN_AI_AGENT_ID_KEY, + GEN_AI_OPERATION_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, TENANT_ID_KEY, ) +from ..inference_operation_type import InferenceOperationType logger = logging.getLogger(__name__) # Maximum allowed span size in bytes (250KB) MAX_SPAN_SIZE_BYTES = 250 * 1024 +# Operation names that identify a span as eligible for export to the Agent 365 +# observability ingest service. Only spans whose gen_ai.operation.name matches +# one of these values are included; all other spans are filtered out. +GEN_AI_OPERATION_NAMES: frozenset[str] = frozenset( + { + INVOKE_AGENT_OPERATION_NAME, + EXECUTE_TOOL_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, + CHAT_OPERATION_NAME, + InferenceOperationType.CHAT.value, + } +) + def hex_trace_id(value: int) -> str: # 128-bit -> 32 hex chars @@ -127,22 +146,44 @@ def truncate_span(span_dict: dict[str, Any]) -> dict[str, Any]: return span_dict -def partition_by_identity( +def filter_and_partition_by_identity( spans: Sequence[ReadableSpan], ) -> dict[tuple[str, str], list[ReadableSpan]]: """ - Extract (tenantId, agentId). Prefer attributes; if you also stamp baggage - into attributes via a processor, they'll be here already. + Filter export-eligible spans and partition them by (tenantId, agentId). + + Only spans whose ``gen_ai.operation.name`` is in + ``GEN_AI_OPERATION_NAMES`` are included; non-genAI spans (e.g. HTTP, DB) + and spans with other operation names are filtered out. Spans without + both tenant and agent identity are also skipped. """ groups: dict[tuple[str, str], list[ReadableSpan]] = {} + non_gen_ai_count = 0 + missing_identity_count = 0 for sp in spans: attrs = sp.attributes or {} + operation_name = as_str(attrs.get(GEN_AI_OPERATION_NAME_KEY)) + if not operation_name or operation_name not in GEN_AI_OPERATION_NAMES: + non_gen_ai_count += 1 + continue tenant = as_str(attrs.get(TENANT_ID_KEY)) agent = as_str(attrs.get(GEN_AI_AGENT_ID_KEY)) if not tenant or not agent: + missing_identity_count += 1 continue key = (tenant, agent) groups.setdefault(key, []).append(sp) + + if non_gen_ai_count > 0: + logger.debug( + "[Agent365Exporter] %d spans without an eligible gen_ai.operation.name filtered out", + non_gen_ai_count, + ) + if missing_identity_count > 0: + logger.debug( + "[Agent365Exporter] %d spans skipped due to missing tenant or agent ID", + missing_identity_count, + ) return groups diff --git a/tests/observability/core/exporters/test_payload_chunking.py b/tests/observability/core/exporters/test_payload_chunking.py index 4ffe7702..6ed9ba9f 100644 --- a/tests/observability/core/exporters/test_payload_chunking.py +++ b/tests/observability/core/exporters/test_payload_chunking.py @@ -10,7 +10,9 @@ from unittest.mock import Mock, patch from microsoft_agents_a365.observability.core.constants import ( + CHAT_OPERATION_NAME, GEN_AI_AGENT_ID_KEY, + GEN_AI_OPERATION_NAME_KEY, TENANT_ID_KEY, ) from microsoft_agents_a365.observability.core.exporters.agent365_exporter import ( @@ -165,6 +167,7 @@ def _make_span(self, span_id: int, attribute_size: int) -> ReadableSpan: mock_span.attributes = { TENANT_ID_KEY: "tenant-1", GEN_AI_AGENT_ID_KEY: "agent-1", + GEN_AI_OPERATION_NAME_KEY: CHAT_OPERATION_NAME, "payload": "x" * attribute_size, } mock_span.events = [] diff --git a/tests/observability/core/test_agent365_exporter.py b/tests/observability/core/test_agent365_exporter.py index 0220b52c..32ef426f 100644 --- a/tests/observability/core/test_agent365_exporter.py +++ b/tests/observability/core/test_agent365_exporter.py @@ -6,7 +6,12 @@ import unittest from unittest.mock import Mock, patch -from microsoft_agents_a365.observability.core.constants import GEN_AI_AGENT_ID_KEY, TENANT_ID_KEY +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_AGENT_ID_KEY, + GEN_AI_OPERATION_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, + TENANT_ID_KEY, +) from microsoft_agents_a365.observability.core.exporters.agent365_exporter import ( DEFAULT_ENDPOINT_URL, _Agent365Exporter, @@ -54,6 +59,7 @@ def _create_mock_span( scope_version: str = "1.0.0", tenant_id: str = "test-tenant-123", agent_id: str = "test-agent-456", + operation_name: str | None = INVOKE_AGENT_OPERATION_NAME, ) -> ReadableSpan: """Create a mock ReadableSpan for testing.""" mock_span = Mock(spec=ReadableSpan) @@ -78,13 +84,15 @@ def _create_mock_span( mock_span.kind = Mock() mock_span.kind.name = "INTERNAL" - # Add identity attributes for partition_by_identity to work + # Add identity attributes for filter_and_partition_by_identity to work span_attributes = attributes or {} if tenant_id and agent_id: span_attributes.update({ TENANT_ID_KEY: tenant_id, GEN_AI_AGENT_ID_KEY: agent_id, }) + if operation_name is not None and GEN_AI_OPERATION_NAME_KEY not in span_attributes: + span_attributes[GEN_AI_OPERATION_NAME_KEY] = operation_name mock_span.attributes = span_attributes mock_span.events = [] @@ -350,10 +358,8 @@ def test_export_error_logging(self, mock_logger): # Verify export succeeded (no identity spans are treated as success) self.assertEqual(result, SpanExportResult.SUCCESS) - # Verify info log for no identity - mock_logger.info.assert_called_with( - "No spans with tenant/agent identity found; nothing exported." - ) + # Verify info log for no eligible spans + mock_logger.info.assert_called_with("No eligible genAI spans to export; nothing exported.") def test_exporter_is_internal(self): """Test that _Agent365Exporter is marked as internal/private. @@ -657,6 +663,97 @@ def test_export_no_fallback_when_default_succeeds(self): self.assertEqual(result, SpanExportResult.SUCCESS) mock_post.assert_called_once() + def test_export_filters_out_non_genai_spans(self): + """Spans without a known gen_ai.operation.name are filtered out.""" + # Arrange: one genAI span and two non-genAI spans (no/unknown operation name) + genai_span = self._create_mock_span("genai_span", trace_id=1, span_id=2) + no_op_span = self._create_mock_span("http_span", trace_id=3, span_id=4, operation_name=None) + unknown_op_span = self._create_mock_span( + "db_span", trace_id=5, span_id=6, operation_name="some_random_op" + ) + + with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post: + # Act + result = self.exporter.export([genai_span, no_op_span, unknown_op_span]) + + # Assert: only the genAI span is exported + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_called_once() + _, body, _ = mock_post.call_args[0] + request_data = json.loads(body) + spans_out = request_data["resourceSpans"][0]["scopeSpans"][0]["spans"] + self.assertEqual(len(spans_out), 1) + self.assertEqual(spans_out[0]["name"], "genai_span") + + def test_export_filters_out_only_non_genai_spans_returns_success(self): + """When all spans are filtered out, export returns SUCCESS without HTTP call.""" + # Arrange + spans = [ + self._create_mock_span("http_span", operation_name=None), + self._create_mock_span("db_span", operation_name="other"), + ] + + with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post: + # Act + result = self.exporter.export(spans) + + # Assert + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_not_called() + + def test_export_includes_inference_operation_type_chat_spans(self): + """Spans with InferenceOperationType.CHAT value ('Chat') are kept without normalization.""" + # Arrange — server accepts 'Chat' via case-insensitive matching + chat_span = self._create_mock_span( + "chat_span", trace_id=1, span_id=2, operation_name="Chat" + ) + + with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post: + result = self.exporter.export([chat_span]) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_called_once() + _, body, _ = mock_post.call_args[0] + request_data = json.loads(body) + spans_out = request_data["resourceSpans"][0]["scopeSpans"][0]["spans"] + self.assertEqual(len(spans_out), 1) + # Value is preserved as-is; no normalization + self.assertEqual(spans_out[0]["attributes"][GEN_AI_OPERATION_NAME_KEY], "Chat") + + def test_export_filters_out_unsupported_inference_operation_types(self): + """Spans with TextCompletion / GenerateContent are filtered out.""" + text_completion_span = self._create_mock_span( + "text_completion_span", trace_id=3, span_id=4, operation_name="TextCompletion" + ) + generate_content_span = self._create_mock_span( + "generate_content_span", trace_id=5, span_id=6, operation_name="GenerateContent" + ) + + with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post: + result = self.exporter.export([text_completion_span, generate_content_span]) + + # Both are filtered out — nothing to export + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_not_called() + + def test_export_does_not_normalize_canonical_operation_names(self): + """invoke_agent / execute_tool / output_messages / chat are not rewritten.""" + cases = ["invoke_agent", "execute_tool", "output_messages", "chat"] + for op in cases: + with self.subTest(operation_name=op): + span = self._create_mock_span( + f"{op}_span", trace_id=1, span_id=2, operation_name=op + ) + with patch.object( + self.exporter, "_post_with_retries", return_value=True + ) as mock_post: + result = self.exporter.export([span]) + self.assertEqual(result, SpanExportResult.SUCCESS) + _, body, _ = mock_post.call_args[0] + request_data = json.loads(body) + span_out = request_data["resourceSpans"][0]["scopeSpans"][0]["spans"][0] + self.assertEqual(span_out["attributes"][GEN_AI_OPERATION_NAME_KEY], op) + if __name__ == "__main__": unittest.main()