From 3353aa05004175ba7542cb4402524125c2fd51a9 Mon Sep 17 00:00:00 2001 From: "Nikhil Chitlur Navakiran (from Dev Box)" Date: Mon, 26 Jan 2026 14:09:18 +0530 Subject: [PATCH 1/5] add sk attribute enrichment --- .../observability/core/__init__.py | 10 +- .../observability/core/config.py | 71 +++++++- .../observability/core/enriched_span.py | 160 ++++++++++++++++++ .../semantickernel/span_enricher.py | 83 +++++++++ .../semantickernel/span_processor.py | 50 +++++- .../semantickernel/trace_instrumentor.py | 50 ++++-- .../extensions/semantickernel/utils.py | 37 ++++ 7 files changed, 437 insertions(+), 24 deletions(-) create mode 100644 libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/enriched_span.py create mode 100644 libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py create mode 100644 libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/utils.py diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py index 07a1a9f8..63b2613d 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py @@ -1,4 +1,5 @@ -# Copyright (c) Microsoft. All rights reserved. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. # Microsoft Agent 365 Python SDK for OpenTelemetry tracing. @@ -8,7 +9,10 @@ get_tracer, get_tracer_provider, is_configured, + register_span_enricher, + unregister_span_enricher, ) +from .enriched_span import EnrichedReadableSpan from .execute_tool_scope import ExecuteToolScope from .execution_type import ExecutionType from .inference_call_details import InferenceCallDetails @@ -31,6 +35,10 @@ "is_configured", "get_tracer", "get_tracer_provider", + # Span enrichment + "register_span_enricher", + "unregister_span_enricher", + "EnrichedReadableSpan", # Span processor "SpanProcessor", # Base scope class diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py index 7a8b9658..e4f1dcb3 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py @@ -1,4 +1,5 @@ -# Copyright (c) Microsoft. All rights reserved. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. import logging import threading @@ -7,7 +8,7 @@ from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, Resource -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from .exporters.agent365_exporter import _Agent365Exporter @@ -17,6 +18,67 @@ DEFAULT_LOGGER_NAME = __name__ +# Registry for span enrichers - allows extensions to add attributes to spans before export +_span_enrichers: list[Callable[[ReadableSpan], ReadableSpan]] = [] +_enrichers_lock = threading.Lock() + + +def register_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None: + """ + Register a function that enriches spans before export. + + Extensions (like Semantic Kernel, LangChain, etc.) can register enrichers + that modify spans before they are exported by the BatchSpanProcessor. + + Args: + enricher: A function that takes a ReadableSpan and returns an + enriched ReadableSpan (or the same span if no changes). + """ + with _enrichers_lock: + if enricher not in _span_enrichers: + _span_enrichers.append(enricher) + + +def unregister_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None: + """ + Remove a previously registered enricher. + + Args: + enricher: The enricher function to remove. + """ + with _enrichers_lock: + if enricher in _span_enrichers: + _span_enrichers.remove(enricher) + + +class _EnrichingBatchSpanProcessor(BatchSpanProcessor): + """ + BatchSpanProcessor that applies registered enrichers before export. + + This allows extensions to modify spans after they end but before + they are batched and exported. + """ + + def on_end(self, span: ReadableSpan) -> None: + """ + Apply all registered enrichers to the span before batching. + + Args: + span: The ReadableSpan that has ended. + """ + enriched_span = span + with _enrichers_lock: + enrichers = list(_span_enrichers) # Copy to avoid holding lock during enrichment + + for enricher in enrichers: + try: + enriched_span = enricher(enriched_span) + except Exception: + # Don't let enrichment failures break the pipeline + pass + + super().on_end(enriched_span) + class TelemetryManager: """ @@ -165,8 +227,9 @@ def _configure_internal( # Add span processors - # Create BatchSpanProcessor with optimized settings - batch_processor = BatchSpanProcessor(exporter, **batch_processor_kwargs) + # Create _EnrichingBatchSpanProcessor with optimized settings + # This allows extensions to enrich spans before export + batch_processor = _EnrichingBatchSpanProcessor(exporter, **batch_processor_kwargs) agent_processor = SpanProcessor() tracer_provider.add_span_processor(batch_processor) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/enriched_span.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/enriched_span.py new file mode 100644 index 00000000..b57bd4e7 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/enriched_span.py @@ -0,0 +1,160 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Enriched ReadableSpan wrapper for adding attributes to immutable spans.""" + +import json +from typing import Any + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.util import types + + +class EnrichedReadableSpan(ReadableSpan): + """ + Wrapper to add attributes to an immutable ReadableSpan. + + Since ReadableSpan is immutable after a span ends, this wrapper allows + extensions to add additional attributes before export without modifying + the original span. + """ + + def __init__(self, span: ReadableSpan, extra_attributes: dict): + """ + Initialize the enriched span wrapper. + + Args: + span: The original ReadableSpan to wrap. + extra_attributes: Additional attributes to merge with the original. + """ + self._span = span + self._extra_attributes = extra_attributes + + @property + def attributes(self) -> types.Attributes: + """Return merged attributes from original span and extra attributes.""" + original = dict(self._span.attributes or {}) + original.update(self._extra_attributes) + return original + + @property + def name(self): + """Return the span name.""" + return self._span.name + + @property + def context(self): + """Return the span context.""" + return self._span.context + + @property + def parent(self): + """Return the parent span context.""" + return self._span.parent + + @property + def start_time(self): + """Return the span start time.""" + return self._span.start_time + + @property + def end_time(self): + """Return the span end time.""" + return self._span.end_time + + @property + def status(self): + """Return the span status.""" + return self._span.status + + @property + def kind(self): + """Return the span kind.""" + return self._span.kind + + @property + def events(self): + """Return the span events.""" + return self._span.events + + @property + def links(self): + """Return the span links.""" + return self._span.links + + @property + def resource(self): + """Return the span resource.""" + return self._span.resource + + @property + def instrumentation_scope(self): + """Return the instrumentation scope.""" + return self._span.instrumentation_scope + + def to_json(self, indent: int | None = 4) -> str: + """ + Convert span to JSON string with enriched attributes. + + Args: + indent: JSON indentation level. + + Returns: + JSON string representation of the span. + """ + # Build the JSON dict manually to include enriched attributes + return json.dumps( + { + "name": self.name, + "context": { + "trace_id": f"0x{self.context.trace_id:032x}", + "span_id": f"0x{self.context.span_id:016x}", + "trace_state": str(self.context.trace_state), + } + if self.context + else None, + "kind": str(self.kind), + "parent_id": f"0x{self.parent.span_id:016x}" if self.parent else None, + "start_time": self._format_time(self.start_time), + "end_time": self._format_time(self.end_time), + "status": { + "status_code": str(self.status.status_code), + "description": self.status.description, + } + if self.status + else None, + "attributes": dict(self.attributes) if self.attributes else None, + "events": [self._format_event(e) for e in self.events] if self.events else None, + "links": [self._format_link(lnk) for lnk in self.links] if self.links else None, + "resource": dict(self.resource.attributes) if self.resource else None, + }, + indent=indent, + ) + + def _format_time(self, time_ns: int | None) -> str | None: + """Format nanosecond timestamp to ISO string.""" + if time_ns is None: + return None + from datetime import datetime, timezone + + return datetime.fromtimestamp(time_ns / 1e9, tz=timezone.utc).isoformat() + + def _format_event(self, event: Any) -> dict: + """Format a span event.""" + return { + "name": event.name, + "timestamp": self._format_time(event.timestamp), + "attributes": dict(event.attributes) if event.attributes else None, + } + + def _format_link(self, link: Any) -> dict: + """Format a span link.""" + return { + "context": { + "trace_id": f"0x{link.context.trace_id:032x}", + "span_id": f"0x{link.context.span_id:016x}", + } + if link.context + else None, + "attributes": dict(link.attributes) if link.attributes else None, + } diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py new file mode 100644 index 00000000..107e0b0f --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py @@ -0,0 +1,83 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Span enricher for Semantic Kernel.""" + +from microsoft_agents_a365.observability.core.constants import ( + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_TOOL_ARGS_KEY, + GEN_AI_TOOL_CALL_RESULT_KEY, + INVOKE_AGENT_OPERATION_NAME, +) +from microsoft_agents_a365.observability.core.enriched_span import EnrichedReadableSpan +from opentelemetry.sdk.trace import ReadableSpan + +from .utils import extract_content_as_string_list + +# Semantic Kernel specific attribute keys +SK_TOOL_CALL_ARGUMENTS_KEY = "gen_ai.tool.call.arguments" +SK_TOOL_CALL_RESULT_KEY = "gen_ai.tool.call.result" + + +def enrich_semantic_kernel_span(span: ReadableSpan) -> ReadableSpan: + """ + Enricher function for Semantic Kernel spans. + + Transforms SK-specific attributes to standard gen_ai attributes + before the span is exported. Enrichment is applied based on span type: + - invoke_agent spans: Extract only content from input/output messages + - execute_tool spans: Map tool arguments and results to standard keys + + Args: + span: The ReadableSpan to enrich. + + Returns: + The enriched span (wrapped if attributes were added), or the + original span if no enrichment was needed. + """ + extra_attributes = {} + attributes = span.attributes or {} + + # Only extract content for invoke_agent spans + if span.name.startswith(INVOKE_AGENT_OPERATION_NAME): + # Transform SK-specific agent invocation attributes to standard gen_ai attributes + # Extract only the content from the full message objects + # Support both gen_ai.agent.invocation_input and gen_ai.input_messages as sources + input_messages = attributes.get("gen_ai.agent.invocation_input") or attributes.get( + GEN_AI_INPUT_MESSAGES_KEY + ) + if input_messages: + extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = extract_content_as_string_list( + input_messages + ) + + output_messages = attributes.get("gen_ai.agent.invocation_output") or attributes.get( + GEN_AI_OUTPUT_MESSAGES_KEY + ) + if output_messages: + extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = extract_content_as_string_list( + output_messages + ) + + # Map tool attributes for execute_tool spans + elif span.name.startswith(EXECUTE_TOOL_OPERATION_NAME): + # Map SK's gen_ai.tool.arguments to standard gen_ai.tool.call.arguments + tool_arguments = attributes.get(GEN_AI_TOOL_ARGS_KEY) or attributes.get( + SK_TOOL_CALL_ARGUMENTS_KEY + ) + if tool_arguments: + extra_attributes[GEN_AI_TOOL_ARGS_KEY] = tool_arguments + + # Map SK's tool result to standard gen_ai.tool.call.result + tool_result = attributes.get(GEN_AI_TOOL_CALL_RESULT_KEY) or attributes.get( + SK_TOOL_CALL_RESULT_KEY + ) + if tool_result: + extra_attributes[GEN_AI_TOOL_CALL_RESULT_KEY] = tool_result + + if extra_attributes: + return EnrichedReadableSpan(span, extra_attributes) + + return span diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py index d7a78133..330f1d85 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py @@ -1,10 +1,15 @@ -# Copyright (c) Microsoft. All rights reserved. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. -# Custom Span Processor - -from microsoft_agents_a365.observability.core.constants import GEN_AI_OPERATION_NAME_KEY +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_EXECUTION_TYPE_KEY, + GEN_AI_OPERATION_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, +) +from microsoft_agents_a365.observability.core.execution_type import ExecutionType from microsoft_agents_a365.observability.core.inference_operation_type import InferenceOperationType from microsoft_agents_a365.observability.core.utils import extract_model_name +from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import SpanProcessor @@ -14,13 +19,46 @@ class SemanticKernelSpanProcessor(SpanProcessor): """ def __init__(self, service_name: str | None = None): + """ + Initialize the Semantic Kernel span processor. + + Args: + service_name: Optional service name for span enrichment. + """ self.service_name = service_name - def on_start(self, span, parent_context): + def on_start(self, span: Span, parent_context) -> None: + """ + Modify span while it's still writable. + + Args: + span: The span that is starting (writable). + parent_context: The parent context of the span. + """ if span.name.startswith("chat."): span.set_attribute(GEN_AI_OPERATION_NAME_KEY, InferenceOperationType.CHAT.value.lower()) model_name = extract_model_name(span.name) span.update_name(f"{InferenceOperationType.CHAT.value.lower()} {model_name}") - def on_end(self, span): + if span.name.startswith(INVOKE_AGENT_OPERATION_NAME): + span.set_attribute( + GEN_AI_EXECUTION_TYPE_KEY, ExecutionType.HUMAN_TO_AGENT.value.lower() + ) + + def on_end(self, span: ReadableSpan) -> None: + """ + Called when a span ends. + + Note: For on_end modifications, use the span enricher pattern + (enrich_semantic_kernel_span) which is registered with the core SDK. + This ensures enriched attributes propagate to the exporter. + """ + pass + + def shutdown(self) -> None: + """Shutdown the processor.""" pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush any pending spans.""" + return True diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py index 1d1b75c6..19f049c2 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py @@ -1,34 +1,39 @@ -# Copyright (c) Microsoft. All rights reserved. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + from __future__ import annotations from collections.abc import Collection from typing import Any -from microsoft_agents_a365.observability.core.config import get_tracer_provider, is_configured +from microsoft_agents_a365.observability.core.config import ( + get_tracer_provider, + is_configured, + register_span_enricher, + unregister_span_enricher, +) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from microsoft_agents_a365.observability.extensions.semantickernel.span_enricher import ( + enrich_semantic_kernel_span, +) from microsoft_agents_a365.observability.extensions.semantickernel.span_processor import ( SemanticKernelSpanProcessor, ) -# ----------------------------- -# 3) The Instrumentor class -# ----------------------------- _instruments = ("semantic-kernel >= 1.0.0",) class SemanticKernelInstrumentor(BaseInstrumentor): """ - Instruments Semantic Kernel: - • Installs your custom OTel SpanProcessor - • (Optionally) attaches an SK function-invocation filter to enrich spans + Instruments Semantic Kernel with Agent365 observability. """ def __init__(self): if not is_configured(): raise RuntimeError( - "Microsoft Agent 365 (or your telemetry config) is not initialized. Configure it before instrumenting." + "Microsoft Agent 365 is not initialized. Call configure() before instrumenting." ) super().__init__() @@ -37,13 +42,32 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs: Any) -> None: """ - kwargs (all optional): - """ + Instrument Semantic Kernel. - # Ensure we have an SDK TracerProvider + Args: + **kwargs: Optional configuration parameters. + """ provider = get_tracer_provider() + + # Add processor for on_start modifications (rename spans, add attributes) self._processor = SemanticKernelSpanProcessor() provider.add_span_processor(self._processor) + # Register enricher for on_end modifications + # This enricher runs before the span is exported, allowing us to + # transform SK-specific attributes to standard gen_ai attributes + register_span_enricher(enrich_semantic_kernel_span) + def _uninstrument(self, **kwargs: Any) -> None: - pass + """ + Remove Semantic Kernel instrumentation. + + Args: + **kwargs: Optional configuration parameters. + """ + # Unregister the enricher + unregister_span_enricher(enrich_semantic_kernel_span) + + # Shutdown the processor + if hasattr(self, "_processor"): + self._processor.shutdown() diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/utils.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/utils.py new file mode 100644 index 00000000..dd1515b7 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/utils.py @@ -0,0 +1,37 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Utility functions for Semantic Kernel observability extensions.""" + +from __future__ import annotations + +import json + + +def extract_content_as_string_list(messages_json: str) -> str: + """Extract content values from messages JSON and return as JSON string list. + + Transforms from: [{"role": "user", "content": "Hello"}] + To: ["Hello"] + + Args: + messages_json: JSON string like '[{"role": "user", "content": "Hello"}]' + + Returns: + JSON string containing only the content values as an array, + or the original string if parsing fails. + """ + try: + messages = json.loads(messages_json) + if isinstance(messages, list): + contents = [] + for msg in messages: + if isinstance(msg, dict) and "content" in msg: + contents.append(msg["content"]) + elif isinstance(msg, str): + contents.append(msg) + return json.dumps(contents) + return messages_json + except (json.JSONDecodeError, TypeError): + # If parsing fails, return as-is + return messages_json From 250afc05d31062ec880718356f6fb44150e6c239 Mon Sep 17 00:00:00 2001 From: "Nikhil Chitlur Navakiran (from Dev Box)" Date: Mon, 26 Jan 2026 19:19:07 +0530 Subject: [PATCH 2/5] organize code and fix test --- .../observability/core/__init__.py | 8 ++- .../observability/core/config.py | 68 ++---------------- .../core/{ => exporters}/enriched_span.py | 0 .../exporters/enriching_span_processor.py | 71 +++++++++++++++++++ .../semantickernel/span_enricher.py | 2 +- .../semantickernel/trace_instrumentor.py | 2 + tests/observability/core/test_agent365.py | 4 +- 7 files changed, 86 insertions(+), 69 deletions(-) rename libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/{ => exporters}/enriched_span.py (100%) create mode 100644 libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriching_span_processor.py diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py index 63b2613d..130d59ca 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py @@ -9,12 +9,14 @@ get_tracer, get_tracer_provider, is_configured, - register_span_enricher, - unregister_span_enricher, ) -from .enriched_span import EnrichedReadableSpan from .execute_tool_scope import ExecuteToolScope from .execution_type import ExecutionType +from .exporters.enriched_span import EnrichedReadableSpan +from .exporters.enriching_span_processor import ( + register_span_enricher, + unregister_span_enricher, +) from .inference_call_details import InferenceCallDetails from .inference_operation_type import InferenceOperationType from .inference_scope import InferenceScope diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py index e4f1dcb3..6c3befdf 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py @@ -8,77 +8,19 @@ from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, Resource -from opentelemetry.sdk.trace import ReadableSpan, TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter from .exporters.agent365_exporter import _Agent365Exporter from .exporters.agent365_exporter_options import Agent365ExporterOptions +from .exporters.enriching_span_processor import ( + _EnrichingBatchSpanProcessor, +) from .exporters.utils import is_agent365_exporter_enabled from .trace_processor.span_processor import SpanProcessor DEFAULT_LOGGER_NAME = __name__ -# Registry for span enrichers - allows extensions to add attributes to spans before export -_span_enrichers: list[Callable[[ReadableSpan], ReadableSpan]] = [] -_enrichers_lock = threading.Lock() - - -def register_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None: - """ - Register a function that enriches spans before export. - - Extensions (like Semantic Kernel, LangChain, etc.) can register enrichers - that modify spans before they are exported by the BatchSpanProcessor. - - Args: - enricher: A function that takes a ReadableSpan and returns an - enriched ReadableSpan (or the same span if no changes). - """ - with _enrichers_lock: - if enricher not in _span_enrichers: - _span_enrichers.append(enricher) - - -def unregister_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None: - """ - Remove a previously registered enricher. - - Args: - enricher: The enricher function to remove. - """ - with _enrichers_lock: - if enricher in _span_enrichers: - _span_enrichers.remove(enricher) - - -class _EnrichingBatchSpanProcessor(BatchSpanProcessor): - """ - BatchSpanProcessor that applies registered enrichers before export. - - This allows extensions to modify spans after they end but before - they are batched and exported. - """ - - def on_end(self, span: ReadableSpan) -> None: - """ - Apply all registered enrichers to the span before batching. - - Args: - span: The ReadableSpan that has ended. - """ - enriched_span = span - with _enrichers_lock: - enrichers = list(_span_enrichers) # Copy to avoid holding lock during enrichment - - for enricher in enrichers: - try: - enriched_span = enricher(enriched_span) - except Exception: - # Don't let enrichment failures break the pipeline - pass - - super().on_end(enriched_span) - class TelemetryManager: """ diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/enriched_span.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriched_span.py similarity index 100% rename from libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/enriched_span.py rename to libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriched_span.py diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriching_span_processor.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriching_span_processor.py new file mode 100644 index 00000000..93d60a83 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriching_span_processor.py @@ -0,0 +1,71 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Span enrichment support for the Agent365 exporter pipeline.""" + +import threading +from collections.abc import Callable + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +# Registry for span enrichers - allows extensions to add attributes to spans before export +_span_enrichers: list[Callable[[ReadableSpan], ReadableSpan]] = [] +_enrichers_lock = threading.Lock() + + +def register_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None: + """ + Register a function that enriches spans before export. + + Extensions (like Semantic Kernel, LangChain, etc.) can register enrichers + that modify spans before they are exported by the BatchSpanProcessor. + + Args: + enricher: A function that takes a ReadableSpan and returns an + enriched ReadableSpan (or the same span if no changes). + """ + with _enrichers_lock: + if enricher not in _span_enrichers: + _span_enrichers.append(enricher) + + +def unregister_span_enricher(enricher: Callable[[ReadableSpan], ReadableSpan]) -> None: + """ + Remove a previously registered enricher. + + Args: + enricher: The enricher function to remove. + """ + with _enrichers_lock: + if enricher in _span_enrichers: + _span_enrichers.remove(enricher) + + +class _EnrichingBatchSpanProcessor(BatchSpanProcessor): + """ + BatchSpanProcessor that applies registered enrichers before export. + + This allows extensions to modify spans after they end but before + they are batched and exported. + """ + + def on_end(self, span: ReadableSpan) -> None: + """ + Apply all registered enrichers to the span before batching. + + Args: + span: The ReadableSpan that has ended. + """ + enriched_span = span + with _enrichers_lock: + enrichers = list(_span_enrichers) # Copy to avoid holding lock during enrichment + + for enricher in enrichers: + try: + enriched_span = enricher(enriched_span) + except Exception: + # Don't let enrichment failures break the pipeline + pass + + super().on_end(enriched_span) diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py index 107e0b0f..06721a75 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py @@ -11,7 +11,7 @@ GEN_AI_TOOL_CALL_RESULT_KEY, INVOKE_AGENT_OPERATION_NAME, ) -from microsoft_agents_a365.observability.core.enriched_span import EnrichedReadableSpan +from microsoft_agents_a365.observability.core.exporters.enriched_span import EnrichedReadableSpan from opentelemetry.sdk.trace import ReadableSpan from .utils import extract_content_as_string_list diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py index bb8e9ea9..5330cc25 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/trace_instrumentor.py @@ -9,6 +9,8 @@ from microsoft_agents_a365.observability.core.config import ( get_tracer_provider, is_configured, +) +from microsoft_agents_a365.observability.core.exporters.enriching_span_processor import ( register_span_enricher, unregister_span_enricher, ) diff --git a/tests/observability/core/test_agent365.py b/tests/observability/core/test_agent365.py index 7ae5086e..358b9351 100644 --- a/tests/observability/core/test_agent365.py +++ b/tests/observability/core/test_agent365.py @@ -84,7 +84,7 @@ def test_configure_with_exporter_options_and_parameter_precedence(self, mock_is_ self.assertTrue(result, "configure() should return True with exporter_options") @patch("microsoft_agents_a365.observability.core.config._Agent365Exporter") - @patch("microsoft_agents_a365.observability.core.config.BatchSpanProcessor") + @patch("microsoft_agents_a365.observability.core.config._EnrichingBatchSpanProcessor") @patch("microsoft_agents_a365.observability.core.config.is_agent365_exporter_enabled") def test_batch_span_processor_and_exporter_called_with_correct_values( self, mock_is_enabled, mock_batch_processor, mock_exporter @@ -198,7 +198,7 @@ def test_configure_uses_existing_tracer_provider(self, mock_get_provider, mock_i # Verify types of processors processor_types = [type(p).__name__ for p in processors] - self.assertIn("BatchSpanProcessor", processor_types) + self.assertIn("_EnrichingBatchSpanProcessor", processor_types) self.assertIn("SpanProcessor", processor_types) From f59708b5577278e59d20c7ea6ee7acb050c88ab7 Mon Sep 17 00:00:00 2001 From: "Nikhil Chitlur Navakiran (from Dev Box)" Date: Mon, 26 Jan 2026 19:27:27 +0530 Subject: [PATCH 3/5] add tests --- .../core/exporters/test_enriched_span.py | 65 ++++++++ .../test_enriching_span_processor.py | 142 ++++++++++++++++++ .../semantickernel/test_span_enricher.py | 58 +++++++ 3 files changed, 265 insertions(+) create mode 100644 tests/observability/core/exporters/test_enriched_span.py create mode 100644 tests/observability/core/exporters/test_enriching_span_processor.py create mode 100644 tests/observability/extensions/semantickernel/test_span_enricher.py diff --git a/tests/observability/core/exporters/test_enriched_span.py b/tests/observability/core/exporters/test_enriched_span.py new file mode 100644 index 00000000..1c4e1f48 --- /dev/null +++ b/tests/observability/core/exporters/test_enriched_span.py @@ -0,0 +1,65 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for EnrichedReadableSpan.""" + +import unittest +from unittest.mock import Mock + +from microsoft_agents_a365.observability.core.exporters.enriched_span import EnrichedReadableSpan + + +class TestEnrichedReadableSpan(unittest.TestCase): + """Test suite for EnrichedReadableSpan.""" + + def test_attributes_merges_original_and_extra(self): + """Test that attributes property merges original span attributes with extra attributes.""" + # Create mock span with original attributes + mock_span = Mock() + mock_span.attributes = {"original_key": "original_value", "shared_key": "original"} + + # Create enriched span with extra attributes + extra_attributes = {"extra_key": "extra_value", "shared_key": "overwritten"} + enriched_span = EnrichedReadableSpan(mock_span, extra_attributes) + + # Verify merged attributes + attributes = enriched_span.attributes + self.assertEqual(attributes["original_key"], "original_value") + self.assertEqual(attributes["extra_key"], "extra_value") + self.assertEqual(attributes["shared_key"], "overwritten") # Extra should overwrite original + + def test_delegates_all_properties_to_wrapped_span(self): + """Test that all span properties are delegated to the wrapped span.""" + # Create mock span with all properties + mock_span = Mock() + mock_span.name = "test-span" + mock_span.context = Mock(trace_id=123, span_id=456) + mock_span.parent = Mock(span_id=789) + mock_span.start_time = 1000000000 + mock_span.end_time = 2000000000 + mock_span.status = Mock(status_code="OK", description=None) + mock_span.kind = "INTERNAL" + mock_span.events = [] + mock_span.links = [] + mock_span.resource = Mock(attributes={"service.name": "test"}) + mock_span.instrumentation_scope = Mock(name="test-scope") + mock_span.attributes = {} + + enriched_span = EnrichedReadableSpan(mock_span, {}) + + # Verify all properties delegate correctly + self.assertEqual(enriched_span.name, "test-span") + self.assertEqual(enriched_span.context, mock_span.context) + self.assertEqual(enriched_span.parent, mock_span.parent) + self.assertEqual(enriched_span.start_time, 1000000000) + self.assertEqual(enriched_span.end_time, 2000000000) + self.assertEqual(enriched_span.status, mock_span.status) + self.assertEqual(enriched_span.kind, "INTERNAL") + self.assertEqual(enriched_span.events, []) + self.assertEqual(enriched_span.links, []) + self.assertEqual(enriched_span.resource, mock_span.resource) + self.assertEqual(enriched_span.instrumentation_scope, mock_span.instrumentation_scope) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/observability/core/exporters/test_enriching_span_processor.py b/tests/observability/core/exporters/test_enriching_span_processor.py new file mode 100644 index 00000000..20aa4f5e --- /dev/null +++ b/tests/observability/core/exporters/test_enriching_span_processor.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for enriching_span_processor module.""" + +import unittest +from unittest.mock import Mock + +from microsoft_agents_a365.observability.core.exporters.enriching_span_processor import ( + _EnrichingBatchSpanProcessor, + _span_enrichers, + register_span_enricher, + unregister_span_enricher, +) + + +class TestSpanEnricherRegistry(unittest.TestCase): + """Test suite for span enricher registration functions.""" + + def setUp(self): + """Clear enrichers before each test.""" + _span_enrichers.clear() + + def tearDown(self): + """Clear enrichers after each test.""" + _span_enrichers.clear() + + def test_register_and_unregister_enricher(self): + """Test that enrichers can be registered and unregistered.""" + + # Define a simple enricher + def my_enricher(span): + return span + + # Register + register_span_enricher(my_enricher) + self.assertIn(my_enricher, _span_enrichers) + self.assertEqual(len(_span_enrichers), 1) + + # Duplicate registration should not add again + register_span_enricher(my_enricher) + self.assertEqual(len(_span_enrichers), 1) + + # Unregister + unregister_span_enricher(my_enricher) + self.assertNotIn(my_enricher, _span_enrichers) + self.assertEqual(len(_span_enrichers), 0) + + def test_unregister_nonexistent_enricher_does_not_raise(self): + """Test that unregistering a non-existent enricher doesn't raise an error.""" + + def my_enricher(span): + return span + + # Should not raise + unregister_span_enricher(my_enricher) + self.assertEqual(len(_span_enrichers), 0) + + +class TestEnrichingBatchSpanProcessor(unittest.TestCase): + """Test suite for _EnrichingBatchSpanProcessor.""" + + def setUp(self): + """Clear enrichers before each test.""" + _span_enrichers.clear() + + def tearDown(self): + """Clear enrichers after each test.""" + _span_enrichers.clear() + + def test_on_end_applies_enrichers_to_span(self): + """Test that on_end applies all registered enrichers to the span.""" + # Create processor with a mock exporter + mock_exporter = Mock() + processor = _EnrichingBatchSpanProcessor(mock_exporter) + + # Register an enricher that tracks what it receives and returns + received_spans = [] + + def enricher(span): + received_spans.append(span) + # Return a mock enriched span + enriched = Mock(name="enriched_span") + enriched.context = span.context + return enriched + + register_span_enricher(enricher) + + # Create a mock span + original_span = Mock(name="original_span") + original_span.context = Mock() + original_span.context.trace_id = 123 + original_span.context.span_id = 456 + + # Call on_end + processor.on_end(original_span) + + # Verify enricher was called with the original span + self.assertEqual(len(received_spans), 1) + self.assertEqual(received_spans[0], original_span) + + # Cleanup + processor.shutdown() + + def test_on_end_continues_if_enricher_raises_exception(self): + """Test that on_end continues processing even if an enricher raises an exception.""" + mock_exporter = Mock() + processor = _EnrichingBatchSpanProcessor(mock_exporter) + + # Track which enrichers were called + called_enrichers = [] + + def failing_enricher(span): + called_enrichers.append("failing") + raise ValueError("Enricher failed!") + + def succeeding_enricher(span): + called_enrichers.append("succeeding") + return span + + register_span_enricher(failing_enricher) + register_span_enricher(succeeding_enricher) + + # Create a mock span + original_span = Mock(name="original_span") + original_span.context = Mock() + original_span.context.trace_id = 123 + original_span.context.span_id = 456 + + # Should not raise despite failing enricher + processor.on_end(original_span) + + # Verify both enrichers were called (failing one didn't stop the chain) + self.assertIn("failing", called_enrichers) + self.assertIn("succeeding", called_enrichers) + + # Cleanup + processor.shutdown() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/observability/extensions/semantickernel/test_span_enricher.py b/tests/observability/extensions/semantickernel/test_span_enricher.py new file mode 100644 index 00000000..c8817838 --- /dev/null +++ b/tests/observability/extensions/semantickernel/test_span_enricher.py @@ -0,0 +1,58 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for Semantic Kernel span enricher.""" + +import unittest +from unittest.mock import Mock + +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, +) +from microsoft_agents_a365.observability.extensions.semantickernel.span_enricher import ( + enrich_semantic_kernel_span, +) + + +class TestSemanticKernelSpanEnricher(unittest.TestCase): + """Test suite for enrich_semantic_kernel_span function.""" + + def test_invoke_agent_span_extracts_content_from_messages(self): + """Test that invoke_agent spans have content extracted from input/output messages.""" + # Create a mock span with invoke_agent name and message attributes + mock_span = Mock() + mock_span.name = "invoke_agent test-agent" + mock_span.attributes = { + "gen_ai.agent.invocation_input": '[{"role": "user", "content": "Hello"}]', + "gen_ai.agent.invocation_output": '[{"role": "assistant", "content": "Hi there!"}]', + } + + # Enrich the span + enriched = enrich_semantic_kernel_span(mock_span) + + # Verify it returns an EnrichedReadableSpan with extracted content + self.assertNotEqual(enriched, mock_span) + attributes = enriched.attributes + # extract_content_as_string_list returns a JSON string + self.assertEqual(attributes[GEN_AI_INPUT_MESSAGES_KEY], '["Hello"]') + self.assertEqual(attributes[GEN_AI_OUTPUT_MESSAGES_KEY], '["Hi there!"]') + + def test_non_matching_span_returns_original(self): + """Test that spans not matching invoke_agent or execute_tool are returned unchanged.""" + # Create a mock span with a different operation name + mock_span = Mock() + mock_span.name = "some_other_operation" + mock_span.attributes = { + "some_key": "some_value", + } + + # Enrich the span + result = enrich_semantic_kernel_span(mock_span) + + # Verify it returns the original span unchanged + self.assertEqual(result, mock_span) + + +if __name__ == "__main__": + unittest.main() From e6721c7ce8129fd0e5b7fd72ca985e438f172bb0 Mon Sep 17 00:00:00 2001 From: "Nikhil Chitlur Navakiran (from Dev Box)" Date: Mon, 26 Jan 2026 19:37:39 +0530 Subject: [PATCH 4/5] address copilot review --- .../extensions/semantickernel/span_enricher.py | 16 ++++------------ .../extensions/semantickernel/span_processor.py | 7 ++----- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py index 06721a75..83f14213 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_enricher.py @@ -63,19 +63,11 @@ def enrich_semantic_kernel_span(span: ReadableSpan) -> ReadableSpan: # Map tool attributes for execute_tool spans elif span.name.startswith(EXECUTE_TOOL_OPERATION_NAME): - # Map SK's gen_ai.tool.arguments to standard gen_ai.tool.call.arguments - tool_arguments = attributes.get(GEN_AI_TOOL_ARGS_KEY) or attributes.get( - SK_TOOL_CALL_ARGUMENTS_KEY - ) - if tool_arguments: - extra_attributes[GEN_AI_TOOL_ARGS_KEY] = tool_arguments + if SK_TOOL_CALL_ARGUMENTS_KEY in attributes: + extra_attributes[GEN_AI_TOOL_ARGS_KEY] = attributes[SK_TOOL_CALL_ARGUMENTS_KEY] - # Map SK's tool result to standard gen_ai.tool.call.result - tool_result = attributes.get(GEN_AI_TOOL_CALL_RESULT_KEY) or attributes.get( - SK_TOOL_CALL_RESULT_KEY - ) - if tool_result: - extra_attributes[GEN_AI_TOOL_CALL_RESULT_KEY] = tool_result + if SK_TOOL_CALL_RESULT_KEY in attributes: + extra_attributes[GEN_AI_TOOL_CALL_RESULT_KEY] = attributes[SK_TOOL_CALL_RESULT_KEY] if extra_attributes: return EnrichedReadableSpan(span, extra_attributes) diff --git a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py index 330f1d85..6763f71d 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-semantickernel/microsoft_agents_a365/observability/extensions/semantickernel/span_processor.py @@ -9,6 +9,7 @@ from microsoft_agents_a365.observability.core.execution_type import ExecutionType from microsoft_agents_a365.observability.core.inference_operation_type import InferenceOperationType from microsoft_agents_a365.observability.core.utils import extract_model_name +from opentelemetry import context as context_api from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import SpanProcessor @@ -27,7 +28,7 @@ def __init__(self, service_name: str | None = None): """ self.service_name = service_name - def on_start(self, span: Span, parent_context) -> None: + def on_start(self, span: Span, parent_context: context_api.Context | None) -> None: """ Modify span while it's still writable. @@ -48,10 +49,6 @@ def on_start(self, span: Span, parent_context) -> None: def on_end(self, span: ReadableSpan) -> None: """ Called when a span ends. - - Note: For on_end modifications, use the span enricher pattern - (enrich_semantic_kernel_span) which is registered with the core SDK. - This ensures enriched attributes propagate to the exporter. """ pass From f720abc3a2b6936f10caaa187a0132098233d5d8 Mon Sep 17 00:00:00 2001 From: "Nikhil Chitlur Navakiran (from Dev Box)" Date: Mon, 2 Feb 2026 23:14:20 +0530 Subject: [PATCH 5/5] update tracer and utility --- .../extensions/langchain/tracer.py | 81 +++++-- .../langchain/tracer_instrumentor.py | 2 +- .../extensions/langchain/utils.py | 228 +++++++++++++++--- 3 files changed, 262 insertions(+), 49 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py index bb8eb122..3204912b 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py @@ -15,6 +15,10 @@ from langchain_core.tracers import BaseTracer, LangChainTracer from langchain_core.tracers.schemas import Run +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_AGENT_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, +) from microsoft_agents_a365.observability.core.inference_operation_type import InferenceOperationType from microsoft_agents_a365.observability.core.utils import ( DictWithLock, @@ -37,11 +41,14 @@ function_calls, input_messages, invocation_parameters, + invoke_agent_input_message, + invoke_agent_output_message, llm_provider, metadata, model_name, output_messages, prompts, + set_execution_type, token_counts, tools, ) @@ -111,8 +118,17 @@ def _start_trace(self, run: Run) -> None: # We can't use real time because the handler may be # called in a background thread. start_time_utc_nano = as_utc_nano(run.start_time) + + # Determine span name based on run type + if run.run_type == "chain" and run.name == "LangGraph": + span_name = f"invoke_agent {run.name}" + elif run.run_type.lower() == "tool": + span_name = f"execute_tool {run.name}" + else: + span_name = run.name + span = self._tracer.start_span( - name=run.name, + name=span_name, context=parent_context, start_time=start_time_utc_nano, ) @@ -197,27 +213,52 @@ def _update_span(span: Span, run: Run) -> None: else: span.set_status(trace_api.Status(trace_api.StatusCode.ERROR, run.error)) + span.set_attributes(dict(get_attributes_from_context())) + if run.run_type == "llm" and run.outputs.get("llm_output").get("id").startswith("chat"): span.update_name(f"{InferenceOperationType.CHAT.value.lower()} {span.name}") - elif run.run_type.lower() == "tool": - span.update_name(f"execute_tool {span.name}") - span.set_attributes(dict(get_attributes_from_context())) - span.set_attributes( - dict( - flatten( - chain( - add_operation_type(run), - prompts(run.inputs), - input_messages(run.inputs), - output_messages(run.outputs), - invocation_parameters(run), - llm_provider(run.extra), - model_name(run.outputs, run.extra), - token_counts(run.outputs), - function_calls(run.outputs), - tools(run), - metadata(run), + is_invoke_agent = span.name.startswith(INVOKE_AGENT_OPERATION_NAME) + + # If this is an invoke_agent span, update span name with agent name + if is_invoke_agent: + agent_name = None + if hasattr(span, "_attributes") and span._attributes: + agent_name = span._attributes.get(GEN_AI_AGENT_NAME_KEY) + if agent_name: + span.update_name(f"{INVOKE_AGENT_OPERATION_NAME} {agent_name}") + + # For invoke_agent spans, add input/output messages + if is_invoke_agent: + span.set_attributes( + dict( + flatten( + chain( + set_execution_type(), + add_operation_type(run), + invoke_agent_input_message(run.inputs), + invoke_agent_output_message(run.outputs), + metadata(run), + ) + ) + ) + ) + else: + span.set_attributes( + dict( + flatten( + chain( + add_operation_type(run), + prompts(run.inputs), + input_messages(run.inputs), + output_messages(run.outputs), + invocation_parameters(run), + llm_provider(run.extra), + model_name(run.outputs, run.extra), + token_counts(run.outputs), + function_calls(run.outputs), + tools(run), + metadata(run), + ) ) ) ) - ) diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py index efb245ba..0b723bba 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py @@ -21,7 +21,7 @@ from microsoft_agents_a365.observability.extensions.langchain.tracer import CustomLangChainTracer -_INSTRUMENTS: str = "langchain_core >= 0.1.0" +_INSTRUMENTS: str = "langchain_core >= 1.2.0" class CustomLangChainInstrumentor(BaseInstrumentor): diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py index b7dfe638..b081e44e 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py @@ -9,6 +9,8 @@ from langchain_core.messages import BaseMessage from langchain_core.tracers.schemas import Run from microsoft_agents_a365.observability.core.constants import ( + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_EXECUTION_TYPE_KEY, GEN_AI_INPUT_MESSAGES_KEY, GEN_AI_OPERATION_NAME_KEY, GEN_AI_OUTPUT_MESSAGES_KEY, @@ -25,8 +27,10 @@ GEN_AI_TOOL_TYPE_KEY, GEN_AI_USAGE_INPUT_TOKENS_KEY, GEN_AI_USAGE_OUTPUT_TOKENS_KEY, + INVOKE_AGENT_OPERATION_NAME, SESSION_ID_KEY, ) +from microsoft_agents_a365.observability.core.execution_type import ExecutionType from microsoft_agents_a365.observability.core.inference_operation_type import InferenceOperationType from microsoft_agents_a365.observability.core.utils import ( get_first_value, @@ -199,8 +203,8 @@ def _parse_message_data(message_data: Mapping[str, Any] | None) -> Iterator[tupl @stop_on_exception def input_messages( inputs: Mapping[str, Any] | None, -) -> Iterator[tuple[str, list[dict[str, Any]]]]: - """Yields chat messages if present.""" +) -> Iterator[tuple[str, str]]: + """Yields chat messages as a JSON array of content strings.""" if not inputs: return assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" @@ -213,27 +217,29 @@ def input_messages( # This will only get the first set of messages. if not (first_messages := next(iter(multiple_messages), None)): return - parsed_messages = [] + contents: list[str] = [] if isinstance(first_messages, list): for message_data in first_messages: if isinstance(message_data, BaseMessage): - parsed_messages.append(dict(_parse_message_data(message_data.to_json()))) + if hasattr(message_data, "content") and message_data.content: + contents.append(str(message_data.content)) elif hasattr(message_data, "get"): - parsed_messages.append(dict(_parse_message_data(message_data))) - else: - raise ValueError(f"failed to parse message of type {type(message_data)}") + if content := message_data.get("content"): + contents.append(str(content)) + elif kwargs := message_data.get("kwargs"): + if hasattr(kwargs, "get") and (content := kwargs.get("content")): + contents.append(str(content)) elif isinstance(first_messages, BaseMessage): - parsed_messages.append(dict(_parse_message_data(first_messages.to_json()))) + if hasattr(first_messages, "content") and first_messages.content: + contents.append(str(first_messages.content)) elif hasattr(first_messages, "get"): - parsed_messages.append(dict(_parse_message_data(first_messages))) + if content := first_messages.get("content"): + contents.append(str(content)) elif isinstance(first_messages, Sequence) and len(first_messages) == 2: - # See e.g. https://github.com/langchain-ai/langchain/blob/18cf457eec106d99e0098b42712299f5d0daa798/libs/core/langchain_core/messages/utils.py#L317 # noqa: E501 role, content = first_messages - parsed_messages.append({"MESSAGE_ROLE": role, "MESSAGE_CONTENT": content}) - else: - raise ValueError(f"failed to parse messages of type {type(first_messages)}") - if parsed_messages: - yield GEN_AI_INPUT_MESSAGES_KEY, parsed_messages + contents.append(str(content)) + if contents: + yield GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(contents) @stop_on_exception @@ -255,8 +261,8 @@ def metadata(run: Run) -> Iterator[tuple[str, str]]: @stop_on_exception def output_messages( outputs: Mapping[str, Any] | None, -) -> Iterator[tuple[str, list[dict[str, Any]]]]: - """Yields chat messages if present.""" +) -> Iterator[tuple[str, str]]: + """Yields chat messages as a JSON array of content strings.""" if not outputs: return assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" @@ -279,18 +285,21 @@ def output_messages( assert isinstance(first_generations, Iterable), ( f"expected Iterable, found {type(first_generations)}" ) - parsed_messages = [] + contents: list[str] = [] for generation in first_generations: assert hasattr(generation, "get"), f"expected Mapping, found {type(generation)}" if message_data := generation.get("message"): if isinstance(message_data, BaseMessage): - parsed_messages.append(dict(_parse_message_data(message_data.to_json()))) + if hasattr(message_data, "content") and message_data.content: + contents.append(str(message_data.content)) elif hasattr(message_data, "get"): - parsed_messages.append(dict(_parse_message_data(message_data))) - else: - raise ValueError(f"fail to parse message of type {type(message_data)}") - if parsed_messages: - yield GEN_AI_OUTPUT_MESSAGES_KEY, parsed_messages + if content := message_data.get("content"): + contents.append(str(content)) + elif kwargs := message_data.get("kwargs"): + if hasattr(kwargs, "get") and (content := kwargs.get("content")): + contents.append(str(content)) + if contents: + yield GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(contents) @stop_on_exception @@ -305,7 +314,6 @@ def invocation_parameters(run: Run) -> Iterator[tuple[str, str]]: assert isinstance(invocation_parameters, Mapping), ( f"expected Mapping, found {type(invocation_parameters)}" ) - yield GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(invocation_parameters) tools = invocation_parameters.get("tools", []) for idx, tool in enumerate(tools): yield f"{GEN_AI_TOOL_ARGS_KEY}.{idx}", safe_json_dumps(tool) @@ -458,7 +466,7 @@ def function_calls(outputs: Mapping[str, Any] | None) -> Iterator[tuple[str, str return # Tool type (explicit) - yield GEN_AI_OPERATION_NAME_KEY, "execute_tool" + yield GEN_AI_OPERATION_NAME_KEY, EXECUTE_TOOL_OPERATION_NAME yield GEN_AI_TOOL_TYPE_KEY, "function" name = fc.get("name") @@ -505,6 +513,34 @@ def tools(run: Run) -> Iterator[tuple[str, str]]: if description := serialized.get("description"): yield GEN_AI_TOOL_DESCRIPTION_KEY, description + # Extract tool call ID from run.extra (LangGraph stores it there) + if run.extra and hasattr(run.extra, "get"): + if tool_call_id := run.extra.get("tool_call_id"): + yield GEN_AI_TOOL_CALL_ID_KEY, tool_call_id + + # Extract tool arguments from inputs + if run.inputs and hasattr(run.inputs, "get"): + # LangGraph wraps args in 'input' key as a string + if input_val := run.inputs.get("input"): + if isinstance(input_val, str): + yield GEN_AI_TOOL_ARGS_KEY, input_val + else: + yield GEN_AI_TOOL_ARGS_KEY, safe_json_dumps(input_val) + + # Extract tool result from outputs + if run.outputs and hasattr(run.outputs, "get"): + if result := run.outputs.get("output"): + # Handle ToolMessage or BaseMessage objects + if isinstance(result, BaseMessage): + result_content = result.content if hasattr(result, "content") else str(result) + elif hasattr(result, "content"): + result_content = result.content + elif isinstance(result, str): + result_content = result + else: + result_content = safe_json_dumps(result) + yield GEN_AI_TOOL_CALL_RESULT_KEY, result_content + def add_operation_type(run: Run) -> Iterator[tuple[str, str]]: """Yields operation type based on run type.""" @@ -512,6 +548,142 @@ def add_operation_type(run: Run) -> Iterator[tuple[str, str]]: if run_type == "llm": yield GEN_AI_OPERATION_NAME_KEY, InferenceOperationType.CHAT.value.lower() elif run_type == "chat_model": - yield GEN_AI_OPERATION_NAME_KEY, "chat" + yield GEN_AI_OPERATION_NAME_KEY, InferenceOperationType.CHAT.value.lower() elif run_type == "tool": - yield GEN_AI_OPERATION_NAME_KEY, "execute_tool" + yield GEN_AI_OPERATION_NAME_KEY, EXECUTE_TOOL_OPERATION_NAME + elif run_type == "chain" and run.name.startswith(INVOKE_AGENT_OPERATION_NAME): + yield GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME + + +def _extract_content_from_message(message: Any) -> str | None: + """Extract content from a LangChain message object or dict.""" + if message is None: + return None + + # Handle BaseMessage objects + if isinstance(message, BaseMessage): + return message.content if hasattr(message, "content") else None + + # Handle dict-like messages + if hasattr(message, "get"): + # Direct content field + if content := message.get("content"): + return content + # Nested in kwargs + if kwargs := message.get("kwargs"): + if hasattr(kwargs, "get") and (content := kwargs.get("content")): + return content + + return None + + +def _get_message_role(message: Any) -> str | None: + """Extract role from a LangChain message object or dict.""" + if message is None: + return None + + # Handle BaseMessage objects + if isinstance(message, BaseMessage): + return message.type if hasattr(message, "type") else None + + # Handle dict-like messages + if hasattr(message, "get"): + # Check various role indicators + if role := message.get("role"): + return role + if msg_type := message.get("type"): + return msg_type + # Check id field for type info (e.g., "HumanMessage", "AIMessage") + if id_field := message.get("id"): + if isinstance(id_field, list) and len(id_field) > 0: + type_name = id_field[-1] + if "Human" in type_name: + return "human" + elif "AI" in type_name or "Assistant" in type_name: + return "ai" + elif "System" in type_name: + return "system" + + return None + + +@stop_on_exception +def invoke_agent_input_message( + inputs: Mapping[str, Any] | None, +) -> Iterator[tuple[str, str]]: + """ + Extract the user input message for invoke_agent spans (LangGraph root). + We want to find the first user/human message content. + """ + if not inputs: + return + + assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" + + messages = inputs.get("messages") + if not messages: + return + + # Handle nested list structure: [[msg1, msg2, ...]] + if isinstance(messages, list) and len(messages) > 0: + first_item = messages[0] + # If first item is also a list, unwrap it + if isinstance(first_item, list): + messages = first_item + + # Find the first user/human message + if isinstance(messages, list): + for message in messages: + role = _get_message_role(message) + if role and role.lower() in ("human", "user"): + content = _extract_content_from_message(message) + if content: + yield GEN_AI_INPUT_MESSAGES_KEY, content + return + + # If no human message found, just get first message content + if len(messages) > 0: + content = _extract_content_from_message(messages[0]) + if content: + yield GEN_AI_INPUT_MESSAGES_KEY, content + + +@stop_on_exception +def invoke_agent_output_message( + outputs: Mapping[str, Any] | None, +) -> Iterator[tuple[str, str]]: + """ + Extract the final output message for invoke_agent spans (LangGraph root). + We want the last AI/assistant message content. + """ + if not outputs: + return + + assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" + + messages = outputs.get("messages") + if not messages: + return + + # Handle nested list structure if present + if isinstance(messages, list) and len(messages) > 0: + first_item = messages[0] + if isinstance(first_item, list): + messages = first_item + + # Find the last AI/assistant message with content (not tool calls) + if isinstance(messages, list): + # Iterate in reverse to find the last AI message + for message in reversed(messages): + role = _get_message_role(message) + if role and role.lower() in ("ai", "assistant"): + content = _extract_content_from_message(message) + # Make sure it has actual content, not just tool calls + if content and isinstance(content, str) and content.strip(): + yield GEN_AI_OUTPUT_MESSAGES_KEY, content + return + + +def set_execution_type() -> Iterator[tuple[str, str]]: + """Yields the execution type as human_to_agent.""" + yield GEN_AI_EXECUTION_TYPE_KEY, ExecutionType.HUMAN_TO_AGENT.value