Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
hex_trace_id,
kind_name,
parse_retry_after,
partition_by_identity,
filter_and_partition_by_identity,
status_name,
truncate_span,
)
Expand Down Expand Up @@ -86,9 +86,10 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return SpanExportResult.FAILURE

try:
groups = partition_by_identity(spans)
groups = filter_and_partition_by_identity(spans)
if not groups:
logger.info("No spans with tenant/agent identity found; nothing exported.")
# No eligible genAI spans to export after filtering/partitioning; treat as success
logger.info("No eligible genAI spans to export; nothing exported.")
return SpanExportResult.SUCCESS

total_spans = sum(len(activities) for activities in groups.values())
Expand Down
46 changes: 43 additions & 3 deletions src/microsoft/opentelemetry/a365/core/exporters/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,35 @@
A365_SERVICE_TENANT_ID_ENV,
A365_SUPPRESS_INVOKE_AGENT_INPUT_ENV,
A365_USE_S2S_ENDPOINT_ENV,
CHAT_OPERATION_NAME,
ENABLE_A365_OBSERVABILITY_EXPORTER,
EXECUTE_TOOL_OPERATION_NAME,
GEN_AI_AGENT_ID_KEY,
GEN_AI_OPERATION_NAME_KEY,
INVOKE_AGENT_OPERATION_NAME,
OUTPUT_MESSAGES_OPERATION_NAME,
TENANT_ID_KEY,
)
from microsoft.opentelemetry.a365.core.inference_operation_type import InferenceOperationType

logger = logging.getLogger(__name__)

# Maximum allowed span size in bytes (250KB)
MAX_SPAN_SIZE_BYTES = 250 * 1024

# Operation names that identify a span as eligible for export to the Agent 365
# observability ingest service. Only spans whose gen_ai.operation.name matches
# one of these values are included; all other spans are filtered out.
GEN_AI_OPERATION_NAMES: frozenset[str] = frozenset(
{
INVOKE_AGENT_OPERATION_NAME,
EXECUTE_TOOL_OPERATION_NAME,
OUTPUT_MESSAGES_OPERATION_NAME,
CHAT_OPERATION_NAME,
InferenceOperationType.CHAT.value,
}
)


# pylint: disable=broad-exception-caught, too-many-return-statements
def hex_trace_id(value: int) -> str:
Expand Down Expand Up @@ -137,22 +156,43 @@ def truncate_span(span_dict: dict[str, Any]) -> dict[str, Any]:
return span_dict


def partition_by_identity(
def filter_and_partition_by_identity(
spans: Sequence[ReadableSpan],
) -> dict[tuple[str, str], list[ReadableSpan]]:
"""Group spans by (tenantId, agentId) extracted from span attributes.
"""Filter export-eligible spans and partition them by (tenantId, agentId).

Spans without both tenant and agent identity are silently dropped.
Only spans whose ``gen_ai.operation.name`` is in
``GEN_AI_OPERATION_NAMES`` are included; non-genAI spans (e.g. HTTP, DB)
and spans with other operation names are filtered out. Spans without
both tenant and agent identity are also skipped.
"""
groups: dict[tuple[str, str], list[ReadableSpan]] = {}
non_gen_ai_count = 0
missing_identity_count = 0
for sp in spans:
attrs = sp.attributes or {}
operation_name = _as_str(attrs.get(GEN_AI_OPERATION_NAME_KEY))
if not operation_name or operation_name not in GEN_AI_OPERATION_NAMES:
non_gen_ai_count += 1
continue
tenant = _as_str(attrs.get(TENANT_ID_KEY))
agent = _as_str(attrs.get(GEN_AI_AGENT_ID_KEY))
if not tenant or not agent:
missing_identity_count += 1
continue
key = (tenant, agent)
groups.setdefault(key, []).append(sp)

if non_gen_ai_count > 0:
logger.debug(
"[Agent365Exporter] %d spans without an eligible gen_ai.operation.name filtered out",
non_gen_ai_count,
)
if missing_identity_count > 0:
logger.debug(
"[Agent365Exporter] %d spans skipped due to missing tenant or agent ID",
missing_identity_count,
)
return groups


Expand Down
100 changes: 99 additions & 1 deletion tests/a365/test_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ def _make_span(
name="test_span",
trace_id=0x1234,
span_id=0x5678,
operation_name="invoke_agent",
):
span = MagicMock()
span.name = name
span.attributes = {
attrs = {
"microsoft.tenant.id": tenant_id,
"gen_ai.agent.id": agent_id,
}
if operation_name is not None:
attrs["gen_ai.operation.name"] = operation_name
span.attributes = attrs

ctx = MagicMock()
ctx.trace_id = trace_id
Expand Down Expand Up @@ -222,5 +226,99 @@ def test_s2s_endpoint_url(self, mock_post):
exporter.shutdown()


class TestAgent365ExporterFiltering(unittest.TestCase):
@patch.dict(os.environ, {}, clear=True)
def test_export_no_eligible_spans_logs_info(self):
exporter = _Agent365Exporter(token_resolver=lambda a, t: "token")
span = MagicMock()
span.attributes = {}
with patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter.logger") as mock_logger:
result = exporter.export([span])
self.assertEqual(result, SpanExportResult.SUCCESS)
mock_logger.info.assert_called_with("No eligible genAI spans to export; nothing exported.")
exporter.shutdown()

@patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries")
@patch.dict(os.environ, {}, clear=True)
def test_export_filters_out_non_genai_spans(self, mock_post):
"""Spans without a known gen_ai.operation.name are filtered out."""
mock_post.return_value = True
exporter = _Agent365Exporter(token_resolver=lambda a, t: "token")
genai_span = _make_span(name="genai_span", trace_id=1, span_id=2)
no_op_span = _make_span(name="http_span", trace_id=3, span_id=4, operation_name=None)
unknown_op_span = _make_span(name="db_span", trace_id=5, span_id=6, operation_name="some_random_op")

result = exporter.export([genai_span, no_op_span, unknown_op_span])

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_called_once()
exporter.shutdown()

@patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries")
@patch.dict(os.environ, {}, clear=True)
def test_export_filters_out_only_non_genai_spans_returns_success(self, mock_post):
"""When all spans are filtered out, export returns SUCCESS without HTTP call."""
mock_post.return_value = True
exporter = _Agent365Exporter(token_resolver=lambda a, t: "token")
spans = [
_make_span(name="http_span", operation_name=None),
_make_span(name="db_span", operation_name="other"),
]

result = exporter.export(spans)

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_not_called()
exporter.shutdown()

@patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries")
@patch.dict(os.environ, {}, clear=True)
def test_export_includes_inference_operation_type_chat_spans(self, mock_post):
"""Spans with InferenceOperationType.CHAT value ('Chat') are kept without normalization."""
mock_post.return_value = True
exporter = _Agent365Exporter(token_resolver=lambda a, t: "token")
chat_span = _make_span(name="chat_span", trace_id=1, span_id=2, operation_name="Chat")

result = exporter.export([chat_span])

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_called_once()
exporter.shutdown()

@patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries")
@patch.dict(os.environ, {}, clear=True)
def test_export_filters_out_unsupported_inference_operation_types(self, mock_post):
"""Spans with TextCompletion / GenerateContent are filtered out."""
mock_post.return_value = True
exporter = _Agent365Exporter(token_resolver=lambda a, t: "token")
text_completion_span = _make_span(
name="text_completion_span", trace_id=3, span_id=4, operation_name="TextCompletion"
)
generate_content_span = _make_span(
name="generate_content_span", trace_id=5, span_id=6, operation_name="GenerateContent"
)

result = exporter.export([text_completion_span, generate_content_span])

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_not_called()
exporter.shutdown()

@patch("microsoft.opentelemetry.a365.core.exporters.agent365_exporter._Agent365Exporter._post_with_retries")
@patch.dict(os.environ, {}, clear=True)
def test_export_does_not_normalize_canonical_operation_names(self, mock_post):
"""invoke_agent / execute_tool / output_messages / chat are not rewritten."""
mock_post.return_value = True
exporter = _Agent365Exporter(token_resolver=lambda a, t: "token")
for op in ("invoke_agent", "execute_tool", "output_messages", "chat"):
with self.subTest(operation_name=op):
mock_post.reset_mock()
span = _make_span(name=f"{op}_span", trace_id=1, span_id=2, operation_name=op)
result = exporter.export([span])
self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_called_once()
exporter.shutdown()


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions tests/a365/test_payload_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def _make_span(self, span_id: int, attribute_size: int) -> ReadableSpan:
mock_span.attributes = {
"microsoft.tenant.id": "tenant-1",
"gen_ai.agent.id": "agent-1",
"gen_ai.operation.name": "chat",
"payload": "x" * attribute_size,
}
mock_span.events = []
Expand Down
31 changes: 19 additions & 12 deletions tests/a365/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
is_agent365_exporter_enabled,
kind_name,
parse_retry_after,
partition_by_identity,
filter_and_partition_by_identity,
status_name,
truncate_span,
)
Expand Down Expand Up @@ -108,44 +108,51 @@ def test_no_attributes(self):
self.assertEqual(result, span)


class TestPartitionByIdentity(unittest.TestCase):
def _make_span(self, tenant_id, agent_id):
class TestFilterAndPartitionByIdentity(unittest.TestCase):
def _make_span(self, tenant_id, agent_id, operation_name="invoke_agent"):
span = MagicMock()
span.attributes = {
attrs = {
"microsoft.tenant.id": tenant_id,
"gen_ai.agent.id": agent_id,
}
if operation_name is not None:
attrs["gen_ai.operation.name"] = operation_name
span.attributes = attrs
return span

def test_groups_by_identity(self):
s1 = self._make_span("t1", "a1")
s2 = self._make_span("t1", "a1")
s3 = self._make_span("t2", "a2")
result = partition_by_identity([s1, s2, s3])
result = filter_and_partition_by_identity([s1, s2, s3])
self.assertEqual(len(result), 2)
self.assertEqual(len(result[("t1", "a1")]), 2)
self.assertEqual(len(result[("t2", "a2")]), 1)

def test_skips_missing_tenant(self):
span = MagicMock()
span.attributes = {"gen_ai.agent.id": "a1"}
result = partition_by_identity([span])
span.attributes = {"gen_ai.agent.id": "a1", "gen_ai.operation.name": "invoke_agent"}
result = filter_and_partition_by_identity([span])
self.assertEqual(len(result), 0)

def test_skips_missing_agent(self):
span = MagicMock()
span.attributes = {"microsoft.tenant.id": "t1"}
result = partition_by_identity([span])
span.attributes = {"microsoft.tenant.id": "t1", "gen_ai.operation.name": "invoke_agent"}
result = filter_and_partition_by_identity([span])
self.assertEqual(len(result), 0)

def test_skips_empty_values(self):
span = MagicMock()
span.attributes = {"microsoft.tenant.id": "", "gen_ai.agent.id": "a1"}
result = partition_by_identity([span])
span.attributes = {
"microsoft.tenant.id": "",
"gen_ai.agent.id": "a1",
"gen_ai.operation.name": "invoke_agent",
}
result = filter_and_partition_by_identity([span])
self.assertEqual(len(result), 0)

def test_empty_spans(self):
result = partition_by_identity([])
result = filter_and_partition_by_identity([])
self.assertEqual(len(result), 0)


Expand Down
Loading