Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/microsoft/opentelemetry/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,16 @@
A365_EXPORTER_TIMEOUT_MS_ARG = "a365_exporter_timeout_ms"
A365_MAX_EXPORT_BATCH_SIZE_ARG = "a365_max_export_batch_size"

# --- GenAI Main Agent Constants ---

# Target attribute keys written by the GenAI main-agent processors so that
# downstream telemetry (spans + logs) is attributed to the user-facing
# ("main") agent rather than internal sub-agents in a multi-agent system.
GEN_AI_MAIN_AGENT_NAME_KEY = "microsoft.gen_ai.main_agent.name"
GEN_AI_MAIN_AGENT_ID_KEY = "microsoft.gen_ai.main_agent.id"
GEN_AI_MAIN_AGENT_VERSION_KEY = "microsoft.gen_ai.main_agent.version"
GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY = "microsoft.gen_ai.main_agent.conversation_id"
Comment thread
rads-1996 marked this conversation as resolved.
GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX = "microsoft.gen_ai.main_agent."

# --- Version propagation for distro to exporter ---
MICROSOFT_OPENTELEMETRY_VERSION_ENV = "microsoft_opentelemetry_version"
19 changes: 19 additions & 0 deletions src/microsoft/opentelemetry/_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
_SPECTRA_PROTOCOL_ENV,
MICROSOFT_OPENTELEMETRY_VERSION_ENV,
)
from microsoft.opentelemetry._genai.main_agent import (
GenAIMainAgentLogRecordProcessor,
GenAIMainAgentSpanProcessor,
)
from microsoft.opentelemetry._instrumentation import get_dist_dependency_conflicts
from microsoft.opentelemetry._otlp import is_otlp_enabled
from microsoft.opentelemetry._sdkstats._state import (
Expand Down Expand Up @@ -235,6 +239,21 @@ def use_microsoft_opentelemetry(**kwargs: object) -> None: # pylint: disable=to
# ---- SDKStats: record distro feature flag ----
set_sdkstats_feature(SdkStatsFeature.DISTRO)

# ---- GenAI main-agent attribute propagation (always on) ----
# Prepended to the processor lists so on_start/on_emit run BEFORE any
# Batch* export processor appended below; this enriches once per
# span/log and is then visible to every downstream exporter.
if not otel_kwargs.get(DISABLE_TRACING_ARG, False):
otel_kwargs[SPAN_PROCESSORS_ARG] = [
GenAIMainAgentSpanProcessor(),
*list(otel_kwargs.get(SPAN_PROCESSORS_ARG) or []),
]
if not otel_kwargs.get(DISABLE_LOGGING_ARG, False):
otel_kwargs[LOG_RECORD_PROCESSORS_ARG] = [
GenAIMainAgentLogRecordProcessor(),
*list(otel_kwargs.get(LOG_RECORD_PROCESSORS_ARG) or []),
]

# ---- OTLP exporters (append to user-supplied processors/readers) ----
_append_otlp_components(otel_kwargs)
if is_otlp_enabled():
Expand Down
12 changes: 12 additions & 0 deletions src/microsoft/opentelemetry/_genai/main_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from microsoft.opentelemetry._genai.main_agent._processor import (
GenAIMainAgentLogRecordProcessor,
GenAIMainAgentSpanProcessor,
)

__all__ = [
"GenAIMainAgentLogRecordProcessor",
"GenAIMainAgentSpanProcessor",
]
128 changes: 128 additions & 0 deletions src/microsoft/opentelemetry/_genai/main_agent/_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Span and log-record processors that propagate
``microsoft.gen_ai.main_agent.*`` attributes from the top-level
(user-facing) GenAI agent so that all downstream telemetry is attributed
to the main agent rather than internal sub-agents in a multi-agent system.
"""

from microsoft.opentelemetry._constants import (
GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX,
GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY,
GEN_AI_MAIN_AGENT_ID_KEY,
GEN_AI_MAIN_AGENT_NAME_KEY,
GEN_AI_MAIN_AGENT_VERSION_KEY,
)
from microsoft.opentelemetry.a365.core.constants import (
GEN_AI_AGENT_ID_KEY,
GEN_AI_AGENT_NAME_KEY,
GEN_AI_AGENT_VERSION_KEY,
GEN_AI_CONVERSATION_ID_KEY,
GEN_AI_OPERATION_NAME_KEY,
INVOKE_AGENT_OPERATION_NAME,
)
from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import SpanProcessor

# Each row: (target attribute on current span,
# primary source attribute on parent span,
# fallback source attribute on parent span)
_PROPAGATION_TABLE: tuple[tuple[str, str, str], ...] = (
(GEN_AI_MAIN_AGENT_NAME_KEY, GEN_AI_MAIN_AGENT_NAME_KEY, GEN_AI_AGENT_NAME_KEY),
(GEN_AI_MAIN_AGENT_ID_KEY, GEN_AI_MAIN_AGENT_ID_KEY, GEN_AI_AGENT_ID_KEY),
(GEN_AI_MAIN_AGENT_VERSION_KEY, GEN_AI_MAIN_AGENT_VERSION_KEY, GEN_AI_AGENT_VERSION_KEY),
(
GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY,
GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY,
GEN_AI_CONVERSATION_ID_KEY,
),
)

# Used at on_end to copy the current span's own gen_ai.* attributes onto the
# microsoft.gen_ai.main_agent.* attributes when the span is the top-level
# invoke_agent span and no main_agent.* attribute has been propagated yet.
_SELF_COPY_TABLE: tuple[tuple[str, str], ...] = tuple(
(target, fallback) for target, _primary, fallback in _PROPAGATION_TABLE
)


class GenAIMainAgentSpanProcessor(SpanProcessor):
"""Propagates ``microsoft.gen_ai.main_agent.*`` attributes onto spans.

On ``on_start``: copies main-agent attributes from the parent span (or
falls back to the parent's ``gen_ai.agent.*`` / ``gen_ai.conversation.id``
attributes) onto the new span.

On ``on_end``: when the span is itself an ``invoke_agent`` operation and
has not already been enriched, copies its own ``gen_ai.agent.*`` /
``gen_ai.conversation.id`` attributes onto ``microsoft.gen_ai.main_agent.*``
so the top-level agent identifies itself as the main agent.
"""

def on_start(self, span: Span, parent_context: context_api.Context | None = None) -> None:
parent = trace.get_current_span(parent_context)
if not parent.get_span_context().is_valid:
return

parent_attributes = getattr(parent, "attributes", None) or {}
for target, primary, fallback in _PROPAGATION_TABLE:
value = parent_attributes.get(primary)
if value is None:
value = parent_attributes.get(fallback)
if value is not None:
span.set_attribute(target, value)

def on_end(self, span: ReadableSpan) -> None:
attributes = span.attributes or {}
if attributes.get(GEN_AI_OPERATION_NAME_KEY) != INVOKE_AGENT_OPERATION_NAME:
return

for key in attributes:
if key.startswith(GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX):
return

if not hasattr(span, "set_attribute"):
return
for target, source in _SELF_COPY_TABLE:
value = attributes.get(source)
if value is not None:
span.set_attribute(target, value) # type: ignore[attr-defined]

def shutdown(self) -> None:
pass

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


class GenAIMainAgentLogRecordProcessor(LogRecordProcessor):
"""Copies any ``microsoft.gen_ai.main_agent.*`` attributes from the
current span onto every emitted log record.
"""

def on_emit(self, log_record: ReadWriteLogRecord) -> None:
span = trace.get_current_span()
Comment thread
rads-1996 marked this conversation as resolved.
if not span.get_span_context().is_valid:
return

span_attributes = getattr(span, "attributes", None) or {}
main_agent_attributes = {
key: value for key, value in span_attributes.items() if key.startswith(GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX)
}
if not main_agent_attributes:
return

if log_record.log_record.attributes is None:
log_record.log_record.attributes = {}
for key, value in main_agent_attributes.items():
log_record.log_record.attributes[key] = value # type: ignore[index]

def shutdown(self) -> None:
pass

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True
95 changes: 95 additions & 0 deletions tests/genai/main_agent/test_log_record_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Tests for GenAIMainAgentLogRecordProcessor."""

import unittest
from unittest.mock import MagicMock, Mock, patch

from microsoft.opentelemetry._constants import (
GEN_AI_MAIN_AGENT_ID_KEY,
GEN_AI_MAIN_AGENT_NAME_KEY,
)
from microsoft.opentelemetry._genai.main_agent._processor import (
GenAIMainAgentLogRecordProcessor,
)


def _mock_span(attributes: dict, valid: bool = True) -> Mock:
span = Mock()
span.attributes = attributes
span_context = Mock()
span_context.is_valid = valid
span.get_span_context.return_value = span_context
return span


def _mock_log_record(attributes):
log_data = MagicMock()
log_data.log_record.attributes = attributes
return log_data


class TestGenAIMainAgentLogRecordProcessorOnEmit(unittest.TestCase):
def setUp(self) -> None:
self.processor = GenAIMainAgentLogRecordProcessor()

def test_no_current_span_does_nothing(self):
log_data = _mock_log_record({"existing": "value"})

with patch(
"microsoft.opentelemetry._genai.main_agent._processor.trace.get_current_span",
return_value=_mock_span({}, valid=False),
):
self.processor.on_emit(log_data)

self.assertEqual(log_data.log_record.attributes, {"existing": "value"})

def test_span_without_main_agent_attrs_does_nothing(self):
log_data = _mock_log_record({"existing": "value"})

with patch(
"microsoft.opentelemetry._genai.main_agent._processor.trace.get_current_span",
return_value=_mock_span({"gen_ai.agent.name": "x"}),
):
self.processor.on_emit(log_data)

self.assertEqual(log_data.log_record.attributes, {"existing": "value"})

def test_copies_main_agent_attrs_to_log_record(self):
log_data = _mock_log_record({"existing": "value"})
span_attrs = {
GEN_AI_MAIN_AGENT_NAME_KEY: "main",
GEN_AI_MAIN_AGENT_ID_KEY: "id-1",
"unrelated": "skip-me",
}

with patch(
"microsoft.opentelemetry._genai.main_agent._processor.trace.get_current_span",
return_value=_mock_span(span_attrs),
):
self.processor.on_emit(log_data)

self.assertEqual(log_data.log_record.attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "main")
self.assertEqual(log_data.log_record.attributes[GEN_AI_MAIN_AGENT_ID_KEY], "id-1")
self.assertEqual(log_data.log_record.attributes["existing"], "value")
self.assertNotIn("unrelated", log_data.log_record.attributes)

def test_initializes_attributes_dict_when_none(self):
log_data = _mock_log_record(None)
span_attrs = {GEN_AI_MAIN_AGENT_NAME_KEY: "main"}

with patch(
"microsoft.opentelemetry._genai.main_agent._processor.trace.get_current_span",
return_value=_mock_span(span_attrs),
):
self.processor.on_emit(log_data)

self.assertEqual(log_data.log_record.attributes, {GEN_AI_MAIN_AGENT_NAME_KEY: "main"})


class TestGenAIMainAgentLogRecordProcessorLifecycle(unittest.TestCase):
def test_shutdown_and_force_flush_are_noops(self):
processor = GenAIMainAgentLogRecordProcessor()
processor.shutdown()
self.assertTrue(processor.force_flush())
Loading
Loading