diff --git a/src/microsoft/opentelemetry/a365/core/exporters/agent365_exporter.py b/src/microsoft/opentelemetry/a365/core/exporters/agent365_exporter.py index 9a18be11..4e9dc8fc 100644 --- a/src/microsoft/opentelemetry/a365/core/exporters/agent365_exporter.py +++ b/src/microsoft/opentelemetry/a365/core/exporters/agent365_exporter.py @@ -33,7 +33,7 @@ hex_trace_id, kind_name, parse_retry_after, - partition_by_identity, + filter_and_partition_by_identity, status_name, truncate_span, ) @@ -86,9 +86,10 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return SpanExportResult.FAILURE try: - groups = partition_by_identity(spans) + groups = filter_and_partition_by_identity(spans) if not groups: - logger.info("No spans with tenant/agent identity found; nothing exported.") + # No eligible genAI spans to export after filtering/partitioning; treat as success + logger.info("No eligible genAI spans to export; nothing exported.") return SpanExportResult.SUCCESS total_spans = sum(len(activities) for activities in groups.values()) diff --git a/src/microsoft/opentelemetry/a365/core/exporters/utils.py b/src/microsoft/opentelemetry/a365/core/exporters/utils.py index 6e0f719e..282b5f21 100644 --- a/src/microsoft/opentelemetry/a365/core/exporters/utils.py +++ b/src/microsoft/opentelemetry/a365/core/exporters/utils.py @@ -35,16 +35,35 @@ A365_SERVICE_TENANT_ID_ENV, A365_SUPPRESS_INVOKE_AGENT_INPUT_ENV, A365_USE_S2S_ENDPOINT_ENV, + CHAT_OPERATION_NAME, ENABLE_A365_OBSERVABILITY_EXPORTER, + EXECUTE_TOOL_OPERATION_NAME, GEN_AI_AGENT_ID_KEY, + GEN_AI_OPERATION_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, TENANT_ID_KEY, ) +from microsoft.opentelemetry.a365.core.inference_operation_type import InferenceOperationType logger = logging.getLogger(__name__) # Maximum allowed span size in bytes (250KB) MAX_SPAN_SIZE_BYTES = 250 * 1024 +# Operation names that identify a span as eligible for export to the Agent 365 +# observability ingest service. Only spans whose gen_ai.operation.name matches +# one of these values are included; all other spans are filtered out. +GEN_AI_OPERATION_NAMES: frozenset[str] = frozenset( + { + INVOKE_AGENT_OPERATION_NAME, + EXECUTE_TOOL_OPERATION_NAME, + OUTPUT_MESSAGES_OPERATION_NAME, + CHAT_OPERATION_NAME, + InferenceOperationType.CHAT.value, + } +) + # pylint: disable=broad-exception-caught, too-many-return-statements def hex_trace_id(value: int) -> str: @@ -137,22 +156,43 @@ def truncate_span(span_dict: dict[str, Any]) -> dict[str, Any]: return span_dict -def partition_by_identity( +def filter_and_partition_by_identity( spans: Sequence[ReadableSpan], ) -> dict[tuple[str, str], list[ReadableSpan]]: - """Group spans by (tenantId, agentId) extracted from span attributes. + """Filter export-eligible spans and partition them by (tenantId, agentId). - Spans without both tenant and agent identity are silently dropped. + Only spans whose ``gen_ai.operation.name`` is in + ``GEN_AI_OPERATION_NAMES`` are included; non-genAI spans (e.g. HTTP, DB) + and spans with other operation names are filtered out. Spans without + both tenant and agent identity are also skipped. """ groups: dict[tuple[str, str], list[ReadableSpan]] = {} + non_gen_ai_count = 0 + missing_identity_count = 0 for sp in spans: attrs = sp.attributes or {} + operation_name = _as_str(attrs.get(GEN_AI_OPERATION_NAME_KEY)) + if not operation_name or operation_name not in GEN_AI_OPERATION_NAMES: + non_gen_ai_count += 1 + continue tenant = _as_str(attrs.get(TENANT_ID_KEY)) agent = _as_str(attrs.get(GEN_AI_AGENT_ID_KEY)) if not tenant or not agent: + missing_identity_count += 1 continue key = (tenant, agent) groups.setdefault(key, []).append(sp) + + if non_gen_ai_count > 0: + logger.debug( + "[Agent365Exporter] %d spans without an eligible gen_ai.operation.name filtered out", + non_gen_ai_count, + ) + if missing_identity_count > 0: + logger.debug( + "[Agent365Exporter] %d spans skipped due to missing tenant or agent ID", + missing_identity_count, + ) return groups diff --git a/tests/a365/test_exporter.py b/tests/a365/test_exporter.py index 59ee2044..9b700b41 100644 --- a/tests/a365/test_exporter.py +++ b/tests/a365/test_exporter.py @@ -19,13 +19,17 @@ def _make_span( name="test_span", trace_id=0x1234, span_id=0x5678, + operation_name="invoke_agent", ): span = MagicMock() span.name = name - span.attributes = { + attrs = { "microsoft.tenant.id": tenant_id, "gen_ai.agent.id": agent_id, } + if operation_name is not None: + attrs["gen_ai.operation.name"] = operation_name + span.attributes = attrs ctx = MagicMock() ctx.trace_id = trace_id @@ -222,5 +226,99 @@ def test_s2s_endpoint_url(self, mock_post): exporter.shutdown() +class TestAgent365ExporterFiltering(unittest.TestCase): + @patch.dict(os.environ, {}, clear=True) + def test_export_no_eligible_spans_logs_info(self): + exporter = _Agent365Exporter(token_resolver=lambda a, t: "token") + span = MagicMock() + span.attributes = {} + with patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter.logger") as mock_logger: + result = exporter.export([span]) + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_logger.info.assert_called_with("No eligible genAI spans to export; nothing exported.") + exporter.shutdown() + + @patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries") + @patch.dict(os.environ, {}, clear=True) + def test_export_filters_out_non_genai_spans(self, mock_post): + """Spans without a known gen_ai.operation.name are filtered out.""" + mock_post.return_value = True + exporter = _Agent365Exporter(token_resolver=lambda a, t: "token") + genai_span = _make_span(name="genai_span", trace_id=1, span_id=2) + no_op_span = _make_span(name="http_span", trace_id=3, span_id=4, operation_name=None) + unknown_op_span = _make_span(name="db_span", trace_id=5, span_id=6, operation_name="some_random_op") + + result = exporter.export([genai_span, no_op_span, unknown_op_span]) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_called_once() + exporter.shutdown() + + @patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries") + @patch.dict(os.environ, {}, clear=True) + def test_export_filters_out_only_non_genai_spans_returns_success(self, mock_post): + """When all spans are filtered out, export returns SUCCESS without HTTP call.""" + mock_post.return_value = True + exporter = _Agent365Exporter(token_resolver=lambda a, t: "token") + spans = [ + _make_span(name="http_span", operation_name=None), + _make_span(name="db_span", operation_name="other"), + ] + + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_not_called() + exporter.shutdown() + + @patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries") + @patch.dict(os.environ, {}, clear=True) + def test_export_includes_inference_operation_type_chat_spans(self, mock_post): + """Spans with InferenceOperationType.CHAT value ('Chat') are kept without normalization.""" + mock_post.return_value = True + exporter = _Agent365Exporter(token_resolver=lambda a, t: "token") + chat_span = _make_span(name="chat_span", trace_id=1, span_id=2, operation_name="Chat") + + result = exporter.export([chat_span]) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_called_once() + exporter.shutdown() + + @patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries") + @patch.dict(os.environ, {}, clear=True) + def test_export_filters_out_unsupported_inference_operation_types(self, mock_post): + """Spans with TextCompletion / GenerateContent are filtered out.""" + mock_post.return_value = True + exporter = _Agent365Exporter(token_resolver=lambda a, t: "token") + text_completion_span = _make_span( + name="text_completion_span", trace_id=3, span_id=4, operation_name="TextCompletion" + ) + generate_content_span = _make_span( + name="generate_content_span", trace_id=5, span_id=6, operation_name="GenerateContent" + ) + + result = exporter.export([text_completion_span, generate_content_span]) + + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_not_called() + exporter.shutdown() + + @patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries") + @patch.dict(os.environ, {}, clear=True) + def test_export_does_not_normalize_canonical_operation_names(self, mock_post): + """invoke_agent / execute_tool / output_messages / chat are not rewritten.""" + mock_post.return_value = True + exporter = _Agent365Exporter(token_resolver=lambda a, t: "token") + for op in ("invoke_agent", "execute_tool", "output_messages", "chat"): + with self.subTest(operation_name=op): + mock_post.reset_mock() + span = _make_span(name=f"{op}_span", trace_id=1, span_id=2, operation_name=op) + result = exporter.export([span]) + self.assertEqual(result, SpanExportResult.SUCCESS) + mock_post.assert_called_once() + exporter.shutdown() + + if __name__ == "__main__": unittest.main() diff --git a/tests/a365/test_payload_chunking.py b/tests/a365/test_payload_chunking.py index 52d4e13c..6e410477 100644 --- a/tests/a365/test_payload_chunking.py +++ b/tests/a365/test_payload_chunking.py @@ -165,6 +165,7 @@ def _make_span(self, span_id: int, attribute_size: int) -> ReadableSpan: mock_span.attributes = { "microsoft.tenant.id": "tenant-1", "gen_ai.agent.id": "agent-1", + "gen_ai.operation.name": "chat", "payload": "x" * attribute_size, } mock_span.events = [] diff --git a/tests/a365/test_utils.py b/tests/a365/test_utils.py index fd7ee6a1..dc05f7cb 100644 --- a/tests/a365/test_utils.py +++ b/tests/a365/test_utils.py @@ -17,7 +17,7 @@ is_agent365_exporter_enabled, kind_name, parse_retry_after, - partition_by_identity, + filter_and_partition_by_identity, status_name, truncate_span, ) @@ -108,44 +108,51 @@ def test_no_attributes(self): self.assertEqual(result, span) -class TestPartitionByIdentity(unittest.TestCase): - def _make_span(self, tenant_id, agent_id): +class TestFilterAndPartitionByIdentity(unittest.TestCase): + def _make_span(self, tenant_id, agent_id, operation_name="invoke_agent"): span = MagicMock() - span.attributes = { + attrs = { "microsoft.tenant.id": tenant_id, "gen_ai.agent.id": agent_id, } + if operation_name is not None: + attrs["gen_ai.operation.name"] = operation_name + span.attributes = attrs return span def test_groups_by_identity(self): s1 = self._make_span("t1", "a1") s2 = self._make_span("t1", "a1") s3 = self._make_span("t2", "a2") - result = partition_by_identity([s1, s2, s3]) + result = filter_and_partition_by_identity([s1, s2, s3]) self.assertEqual(len(result), 2) self.assertEqual(len(result[("t1", "a1")]), 2) self.assertEqual(len(result[("t2", "a2")]), 1) def test_skips_missing_tenant(self): span = MagicMock() - span.attributes = {"gen_ai.agent.id": "a1"} - result = partition_by_identity([span]) + span.attributes = {"gen_ai.agent.id": "a1", "gen_ai.operation.name": "invoke_agent"} + result = filter_and_partition_by_identity([span]) self.assertEqual(len(result), 0) def test_skips_missing_agent(self): span = MagicMock() - span.attributes = {"microsoft.tenant.id": "t1"} - result = partition_by_identity([span]) + span.attributes = {"microsoft.tenant.id": "t1", "gen_ai.operation.name": "invoke_agent"} + result = filter_and_partition_by_identity([span]) self.assertEqual(len(result), 0) def test_skips_empty_values(self): span = MagicMock() - span.attributes = {"microsoft.tenant.id": "", "gen_ai.agent.id": "a1"} - result = partition_by_identity([span]) + span.attributes = { + "microsoft.tenant.id": "", + "gen_ai.agent.id": "a1", + "gen_ai.operation.name": "invoke_agent", + } + result = filter_and_partition_by_identity([span]) self.assertEqual(len(result), 0) def test_empty_spans(self): - result = partition_by_identity([]) + result = filter_and_partition_by_identity([]) self.assertEqual(len(result), 0)