diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index ca2c2d0a0e..0ad42baf5e 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add support for emitting inference events and enrich message types. ([#3994](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3994)) - Minor change to check LRU cache in Completion Hook before acquiring semaphore/thread ([#3907](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3907)). - Add environment variable for genai upload hook queue size ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3943](#3943)) diff --git a/util/opentelemetry-util-genai/README.rst b/util/opentelemetry-util-genai/README.rst index a06b3a0fd0..50c869c517 100644 --- a/util/opentelemetry-util-genai/README.rst +++ b/util/opentelemetry-util-genai/README.rst @@ -9,7 +9,11 @@ while providing standardization for generating both types of otel, "spans and me This package relies on environment variables to configure capturing of message content. By default, message content will not be captured. Set the environment variable `OTEL_SEMCONV_STABILITY_OPT_IN` to `gen_ai_latest_experimental` to enable experimental features. -And set the environment variable `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` to `SPAN_ONLY` or `SPAN_AND_EVENT` to capture message content in spans. +And set the environment variable `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` to one of: +- `NO_CONTENT`: Do not capture message content (default). +- `SPAN_ONLY`: Capture message content in spans only. +- `EVENT_ONLY`: Capture message content in events only. +- `SPAN_AND_EVENT`: Capture message content in both spans and events. This package provides these span attributes: @@ -23,6 +27,11 @@ This package provides these span attributes: - `gen_ai.usage.output_tokens`: Int(7) - `gen_ai.input.messages`: Str('[{"role": "Human", "parts": [{"content": "hello world", "type": "text"}]}]') - `gen_ai.output.messages`: Str('[{"role": "AI", "parts": [{"content": "hello back", "type": "text"}], "finish_reason": "stop"}]') +- `gen_ai.system_instructions`: Str('[{"content": "You are a helpful assistant.", "type": "text"}]') (when system instruction is provided) + +When `EVENT_ONLY` or `SPAN_AND_EVENT` mode is enabled and a LoggerProvider is configured, +the package also emits `gen_ai.client.inference.operation.details` events with structured +message content (as dictionaries instead of JSON strings). Installation diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index d2b18fd6be..4f0b98e6fd 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -172,7 +172,7 @@ def _calculate_ref_path( if is_system_instructions_hashable(system_instruction): # Get a hash of the text. system_instruction_hash = hashlib.sha256( - "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType] + "\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType, reportCallIssue, reportArgumentType] "utf-8" ), usedforsecurity=False, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py index 4a8dde216f..a1f5848372 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py @@ -16,6 +16,15 @@ "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT" ) +OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT = "OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT" +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT + +Controls whether to emit gen_ai.client.inference.operation.details events. +Must be one of ``true`` or ``false`` (case-insensitive). +Defaults to ``false``. +""" + OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK = ( "OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK" ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index bc2f2fa350..54e626deaa 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -65,10 +65,11 @@ from typing import Iterator from opentelemetry import context as otel_context -from opentelemetry.metrics import MeterProvider, get_meter -from opentelemetry.semconv._incubating.attributes import ( - gen_ai_attributes as GenAI, +from opentelemetry._logs import ( + LoggerProvider, + get_logger, ) +from opentelemetry.metrics import MeterProvider, get_meter from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import ( Span, @@ -80,7 +81,8 @@ from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( _apply_error_attributes, - _apply_finish_attributes, + _apply_llm_finish_attributes, + _maybe_emit_llm_event, ) from opentelemetry.util.genai.types import Error, LLMInvocation from opentelemetry.util.genai.version import __version__ @@ -96,6 +98,7 @@ def __init__( self, tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, + logger_provider: LoggerProvider | None = None, ): self._tracer = get_tracer( __name__, @@ -106,6 +109,12 @@ def __init__( self._metrics_recorder: InvocationMetricsRecorder | None = None meter = get_meter(__name__, meter_provider=meter_provider) self._metrics_recorder = InvocationMetricsRecorder(meter) + self._logger = get_logger( + __name__, + __version__, + logger_provider, + schema_url=Schemas.V1_37_0.value, + ) def _record_llm_metrics( self, @@ -129,7 +138,7 @@ def start_llm( """Start an LLM invocation and create a pending span entry.""" # Create a span and attach it as current; keep the token to detach later span = self._tracer.start_span( - name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}", + name=f"{invocation.operation_name} {invocation.request_model}", kind=SpanKind.CLIENT, ) # Record a monotonic start timestamp (seconds) for duration @@ -148,8 +157,9 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab return invocation span = invocation.span - _apply_finish_attributes(span, invocation) + _apply_llm_finish_attributes(span, invocation) self._record_llm_metrics(invocation, span) + _maybe_emit_llm_event(self._logger, span, invocation) # Detach context and end span otel_context.detach(invocation.context_token) span.end() @@ -164,10 +174,11 @@ def fail_llm( # pylint: disable=no-self-use return invocation span = invocation.span - _apply_finish_attributes(invocation.span, invocation) - _apply_error_attributes(span, error) + _apply_llm_finish_attributes(invocation.span, invocation) + _apply_error_attributes(invocation.span, error) error_type = getattr(error.type, "__qualname__", None) self._record_llm_metrics(invocation, span, error_type=error_type) + _maybe_emit_llm_event(self._logger, span, invocation, error) # Detach context and end span otel_context.detach(invocation.context_token) span.end() @@ -201,6 +212,7 @@ def llm( def get_telemetry_handler( tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, + logger_provider: LoggerProvider | None = None, ) -> TelemetryHandler: """ Returns a singleton TelemetryHandler instance. @@ -212,6 +224,7 @@ def get_telemetry_handler( handler = TelemetryHandler( tracer_provider=tracer_provider, meter_provider=meter_provider, + logger_provider=logger_provider, ) setattr(get_telemetry_handler, "_default_handler", handler) return handler diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index b9b8777ec2..d52be22eed 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -17,6 +17,8 @@ from dataclasses import asdict from typing import Any +from opentelemetry._logs import Logger, LogRecord +from opentelemetry.context import get_current from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) @@ -26,11 +28,13 @@ from opentelemetry.trace import ( Span, ) +from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.genai.types import ( Error, InputMessage, LLMInvocation, + MessagePart, OutputMessage, ) from opentelemetry.util.genai.utils import ( @@ -38,66 +42,161 @@ gen_ai_json_dumps, get_content_capturing_mode, is_experimental_mode, + should_emit_event, ) -def _apply_common_span_attributes( - span: Span, invocation: LLMInvocation -) -> None: - """Apply attributes shared by finish() and error() and compute metrics. +def _get_llm_common_attributes( + invocation: LLMInvocation, +) -> dict[str, Any]: + """Get common LLM attributes shared by finish() and error() paths. - Returns (genai_attributes) for use with metrics. + Returns a dictionary of attributes. """ - span.update_name( - f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}".strip() - ) - span.set_attribute( - GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value - ) + attributes: dict[str, Any] = {} + attributes[GenAI.GEN_AI_OPERATION_NAME] = invocation.operation_name if invocation.request_model: - span.set_attribute( - GenAI.GEN_AI_REQUEST_MODEL, invocation.request_model - ) + attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model if invocation.provider is not None: # TODO: clean provider name to match GenAiProviderNameValues? - span.set_attribute(GenAI.GEN_AI_PROVIDER_NAME, invocation.provider) + attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider + return attributes - _apply_response_attributes(span, invocation) +def _get_llm_span_name(invocation: LLMInvocation) -> str: + """Get the span name for an LLM invocation.""" + return f"{invocation.operation_name} {invocation.request_model}".strip() -def _maybe_set_span_messages( - span: Span, + +def _get_llm_messages_attributes_for_span( input_messages: list[InputMessage], output_messages: list[OutputMessage], -) -> None: + system_instruction: list[MessagePart] | None = None, +) -> dict[str, Any]: + """Get message attributes formatted for span (JSON string format). + + Returns empty dict if not in experimental mode or content capturing is disabled. + """ + attributes: dict[str, Any] = {} if not is_experimental_mode() or get_content_capturing_mode() not in ( ContentCapturingMode.SPAN_ONLY, ContentCapturingMode.SPAN_AND_EVENT, ): - return + return attributes if input_messages: - span.set_attribute( - GenAI.GEN_AI_INPUT_MESSAGES, - gen_ai_json_dumps([asdict(message) for message in input_messages]), + attributes[GenAI.GEN_AI_INPUT_MESSAGES] = gen_ai_json_dumps( + [asdict(message) for message in input_messages] + ) + if output_messages: + attributes[GenAI.GEN_AI_OUTPUT_MESSAGES] = gen_ai_json_dumps( + [asdict(message) for message in output_messages] + ) + if system_instruction: + attributes[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] = gen_ai_json_dumps( + [asdict(part) for part in system_instruction] ) + return attributes + + +def _get_llm_messages_attributes_for_event( + input_messages: list[InputMessage], + output_messages: list[OutputMessage], + system_instruction: list[MessagePart] | None = None, +) -> dict[str, Any]: + """Get message attributes formatted for event (structured format). + + Returns empty dict if not in experimental mode or content capturing is disabled. + """ + attributes: dict[str, Any] = {} + if not is_experimental_mode() or get_content_capturing_mode() not in ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + return attributes + if input_messages: + attributes[GenAI.GEN_AI_INPUT_MESSAGES] = [ + asdict(message) for message in input_messages + ] if output_messages: - span.set_attribute( - GenAI.GEN_AI_OUTPUT_MESSAGES, - gen_ai_json_dumps( - [asdict(message) for message in output_messages] - ), + attributes[GenAI.GEN_AI_OUTPUT_MESSAGES] = [ + asdict(message) for message in output_messages + ] + if system_instruction: + attributes[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] = [ + asdict(part) for part in system_instruction + ] + return attributes + + +def _maybe_emit_llm_event( + logger: Logger | None, + span: Span, + invocation: LLMInvocation, + error: Error | None = None, +) -> None: + """Emit a gen_ai.client.inference.operation.details event to the logger. + + This function creates a LogRecord event following the semantic convention + for gen_ai.client.inference.operation.details as specified in the GenAI + event semantic conventions. + + For more details, see the semantic convention documentation: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aiclientinferenceoperationdetails + """ + if not is_experimental_mode() or not should_emit_event() or logger is None: + return + + # Build event attributes by reusing the attribute getter functions + attributes: dict[str, Any] = {} + attributes.update(_get_llm_common_attributes(invocation)) + attributes.update(_get_llm_request_attributes(invocation)) + attributes.update(_get_llm_response_attributes(invocation)) + attributes.update( + _get_llm_messages_attributes_for_event( + invocation.input_messages, + invocation.output_messages, + invocation.system_instruction, ) + ) + + # Add error.type if operation ended in error + if error is not None: + attributes[ErrorAttributes.ERROR_TYPE] = error.type.__qualname__ + + # Create and emit the event + context = set_span_in_context(span, get_current()) + event = LogRecord( + event_name="gen_ai.client.inference.operation.details", + attributes=attributes, + context=context, + ) + logger.emit(event) -def _apply_finish_attributes(span: Span, invocation: LLMInvocation) -> None: +def _apply_llm_finish_attributes( + span: Span, invocation: LLMInvocation +) -> None: """Apply attributes/messages common to finish() paths.""" - _apply_common_span_attributes(span, invocation) - _maybe_set_span_messages( - span, invocation.input_messages, invocation.output_messages + # Update span name + span.update_name(_get_llm_span_name(invocation)) + + # Build all attributes by reusing the attribute getter functions + attributes: dict[str, Any] = {} + attributes.update(_get_llm_common_attributes(invocation)) + attributes.update(_get_llm_request_attributes(invocation)) + attributes.update(_get_llm_response_attributes(invocation)) + attributes.update( + _get_llm_messages_attributes_for_span( + invocation.input_messages, + invocation.output_messages, + invocation.system_instruction, + ) ) - _apply_request_attributes(span, invocation) - _apply_response_attributes(span, invocation) - span.set_attributes(invocation.attributes) + attributes.update(invocation.attributes) + + # Set all attributes on the span + if attributes: + span.set_attributes(attributes) def _apply_error_attributes(span: Span, error: Error) -> None: @@ -107,8 +206,10 @@ def _apply_error_attributes(span: Span, error: Error) -> None: span.set_attribute(ErrorAttributes.ERROR_TYPE, error.type.__qualname__) -def _apply_request_attributes(span: Span, invocation: LLMInvocation) -> None: - """Attach GenAI request semantic convention attributes to the span.""" +def _get_llm_request_attributes( + invocation: LLMInvocation, +) -> dict[str, Any]: + """Get GenAI request semantic convention attributes.""" attributes: dict[str, Any] = {} if invocation.temperature is not None: attributes[GenAI.GEN_AI_REQUEST_TEMPERATURE] = invocation.temperature @@ -130,12 +231,13 @@ def _apply_request_attributes(span: Span, invocation: LLMInvocation) -> None: ) if invocation.seed is not None: attributes[GenAI.GEN_AI_REQUEST_SEED] = invocation.seed - if attributes: - span.set_attributes(attributes) + return attributes -def _apply_response_attributes(span: Span, invocation: LLMInvocation) -> None: - """Attach GenAI response semantic convention attributes to the span.""" +def _get_llm_response_attributes( + invocation: LLMInvocation, +) -> dict[str, Any]: + """Get GenAI response semantic convention attributes.""" attributes: dict[str, Any] = {} finish_reasons: list[str] | None @@ -169,13 +271,15 @@ def _apply_response_attributes(span: Span, invocation: LLMInvocation) -> None: if invocation.output_tokens is not None: attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] = invocation.output_tokens - if attributes: - span.set_attributes(attributes) + return attributes __all__ = [ - "_apply_finish_attributes", + "_apply_llm_finish_attributes", "_apply_error_attributes", - "_apply_request_attributes", - "_apply_response_attributes", + "_get_llm_common_attributes", + "_get_llm_request_attributes", + "_get_llm_response_attributes", + "_get_llm_span_name", + "_maybe_emit_llm_event", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 4fbb059e73..e8b4148f47 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -22,6 +22,9 @@ from typing_extensions import TypeAlias from opentelemetry.context import Context +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) from opentelemetry.trace import Span ContextToken: TypeAlias = Token[Context] @@ -40,6 +43,12 @@ class ContentCapturingMode(Enum): @dataclass() class ToolCall: + """Represents a tool call requested by the model + + This model is specified as part of semconv in `GenAI messages Python models - ToolCallRequestPart + `__. + """ + arguments: Any name: str id: str | None @@ -48,23 +57,94 @@ class ToolCall: @dataclass() class ToolCallResponse: + """Represents a tool call result sent to the model or a built-in tool call outcome and details + + This model is specified as part of semconv in `GenAI messages Python models - ToolCallResponsePart + `__. + """ + response: Any id: str | None type: Literal["tool_call_response"] = "tool_call_response" -FinishReason = Literal[ - "content_filter", "error", "length", "stop", "tool_calls" -] - - @dataclass() class Text: + """Represents text content sent to or received from the model + + This model is specified as part of semconv in `GenAI messages Python models - TextPart + `__. + """ + content: str type: Literal["text"] = "text" -MessagePart = Union[Text, ToolCall, ToolCallResponse, Any] +@dataclass() +class Reasoning: + """Represents reasoning/thinking content received from the model + + This model is specified as part of semconv in `GenAI messages Python models - ReasoningPart + `__. + """ + + content: str + type: Literal["reasoning"] = "reasoning" + + +Modality = Literal["image", "video", "audio"] + + +@dataclass() +class Blob: + """Represents blob binary data sent inline to the model + + This model is specified as part of semconv in `GenAI messages Python models - BlobPart + `__. + """ + + mime_type: str | None + modality: Union[Modality, str] + content: bytes + type: Literal["blob"] = "blob" + + +@dataclass() +class File: + """Represents an external referenced file sent to the model by file id + + This model is specified as part of semconv in `GenAI messages Python models - FilePart + `__. + """ + + mime_type: str | None + modality: Union[Modality, str] + file_id: str + type: Literal["file"] = "file" + + +@dataclass() +class Uri: + """Represents an external referenced file sent to the model by URI + + This model is specified as part of semconv in `GenAI messages Python models - UriPart + `__. + """ + + mime_type: str | None + modality: Union[Modality, str] + uri: str + type: Literal["uri"] = "uri" + + +MessagePart = Union[ + Text, ToolCall, ToolCallResponse, Blob, File, Uri, Reasoning, Any +] + + +FinishReason = Literal[ + "content_filter", "error", "length", "stop", "tool_calls" +] @dataclass() @@ -88,6 +168,10 @@ def _new_output_messages() -> list[OutputMessage]: return [] +def _new_system_instruction() -> list[MessagePart]: + return [] + + def _new_str_any_dict() -> dict[str, Any]: return {} @@ -101,6 +185,8 @@ class LLMInvocation: """ request_model: str + # Chat by default + operation_name: str = GenAI.GenAiOperationNameValues.CHAT.value context_token: ContextToken | None = None span: Span | None = None input_messages: list[InputMessage] = field( @@ -109,6 +195,9 @@ class LLMInvocation: output_messages: list[OutputMessage] = field( default_factory=_new_output_messages ) + system_instruction: list[MessagePart] = field( + default_factory=_new_system_instruction + ) provider: str | None = None response_model_name: str | None = None response_id: str | None = None diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py index e9dd43cea6..dc47508d6f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py @@ -26,6 +26,7 @@ ) from opentelemetry.util.genai.environment_variables import ( OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, + OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, ) from opentelemetry.util.genai.types import ContentCapturingMode @@ -64,6 +65,28 @@ def get_content_capturing_mode() -> ContentCapturingMode: return ContentCapturingMode.NO_CONTENT +def should_emit_event() -> bool: + """Check if event emission is enabled. + + Returns True if event emission is enabled, False otherwise. + Defaults to False if the environment variable is not set. + """ + envvar = os.environ.get(OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT) + if not envvar: + return False + envvar_lower = envvar.lower() + if envvar_lower == "true": + return True + if envvar_lower == "false": + return False + logger.warning( + "%s is not a valid option for `%s` environment variable. Must be one of true or false (case-insensitive). Defaulting to `false`.", + envvar, + OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, + ) + return False + + class _GenAiJsonEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, bytes): diff --git a/util/opentelemetry-util-genai/tests/test_utils.py b/util/opentelemetry-util-genai/tests/test_utils.py index aecb16c541..969f17cbb4 100644 --- a/util/opentelemetry-util-genai/tests/test_utils.py +++ b/util/opentelemetry-util-genai/tests/test_utils.py @@ -23,6 +23,11 @@ OTEL_SEMCONV_STABILITY_OPT_IN, _OpenTelemetrySemanticConventionStability, ) +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + InMemoryLogRecordExporter, + SimpleLogRecordProcessor, +) from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( @@ -38,25 +43,32 @@ from opentelemetry.trace.status import StatusCode from opentelemetry.util.genai.environment_variables import ( OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, + OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, ) from opentelemetry.util.genai.handler import get_telemetry_handler from opentelemetry.util.genai.types import ( ContentCapturingMode, + Error, InputMessage, LLMInvocation, + MessagePart, OutputMessage, Text, ) -from opentelemetry.util.genai.utils import get_content_capturing_mode +from opentelemetry.util.genai.utils import ( + get_content_capturing_mode, + should_emit_event, +) -def patch_env_vars(stability_mode, content_capturing): +def patch_env_vars(stability_mode, content_capturing, emit_event): def decorator(test_case): @patch.dict( os.environ, { OTEL_SEMCONV_STABILITY_OPT_IN: stability_mode, OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: content_capturing, + OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT: emit_event, }, ) def wrapper(*args, **kwargs): @@ -84,6 +96,12 @@ def _create_output_message( ) +def _create_system_instruction( + content: str = "You are a helpful assistant.", +) -> list[MessagePart]: + return [Text(content=content)] + + def _get_single_span(span_exporter: InMemorySpanExporter) -> ReadableSpan: spans = span_exporter.get_finished_spans() assert len(spans) == 1 @@ -140,21 +158,38 @@ def _assert_text_message( assert message.get("finish_reason") == finish_reason +def _normalize_to_list(value: Any) -> list[Any]: + """Normalize tuple or list to list for OpenTelemetry compatibility.""" + return list(value) if isinstance(value, tuple) else value + + +def _normalize_to_dict(value: Any) -> dict[str, Any]: + """Normalize tuple or dict to dict for OpenTelemetry compatibility.""" + return dict(value) if isinstance(value, tuple) else value + + class TestVersion(unittest.TestCase): @patch_env_vars( stability_mode="gen_ai_latest_experimental", content_capturing="SPAN_ONLY", + emit_event="", ) def test_get_content_capturing_mode_parses_valid_envvar(self): # pylint: disable=no-self-use assert get_content_capturing_mode() == ContentCapturingMode.SPAN_ONLY @patch_env_vars( - stability_mode="gen_ai_latest_experimental", content_capturing="" + stability_mode="gen_ai_latest_experimental", + content_capturing="", + emit_event="", ) def test_empty_content_capturing_envvar(self): # pylint: disable=no-self-use assert get_content_capturing_mode() == ContentCapturingMode.NO_CONTENT - @patch_env_vars(stability_mode="default", content_capturing="True") + @patch_env_vars( + stability_mode="default", + content_capturing="True", + emit_event="", + ) def test_get_content_capturing_mode_raises_exception_when_semconv_stability_default( self, ): # pylint: disable=no-self-use @@ -164,6 +199,7 @@ def test_get_content_capturing_mode_raises_exception_when_semconv_stability_defa @patch_env_vars( stability_mode="gen_ai_latest_experimental", content_capturing="INVALID_VALUE", + emit_event="", ) def test_get_content_capturing_mode_raises_exception_on_invalid_envvar( self, @@ -176,6 +212,75 @@ def test_get_content_capturing_mode_raises_exception_on_invalid_envvar( self.assertIn("INVALID_VALUE is not a valid option for ", cm.output[0]) +class TestShouldEmitEvent(unittest.TestCase): + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="true", + ) + def test_should_emit_event_returns_true_when_set_to_true( + self, + ): # pylint: disable=no-self-use + assert should_emit_event() is True + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="True", + ) + def test_should_emit_event_case_insensitive_true( + self, + ): # pylint: disable=no-self-use + assert should_emit_event() is True + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="false", + ) + def test_should_emit_event_returns_false_when_set_to_false( + self, + ): # pylint: disable=no-self-use + assert should_emit_event() is False + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="False", + ) + def test_should_emit_event_case_insensitive_false( + self, + ): # pylint: disable=no-self-use + assert should_emit_event() is False + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="", + ) + def test_should_emit_event_by_defaults( + self, + ): # pylint: disable=no-self-use + assert should_emit_event() is False + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="INVALID_VALUE", + ) + def test_should_emit_event_with_invalid_value( + self, + ): # pylint: disable=no-self-use + with self.assertLogs(level="WARNING") as cm: + result = should_emit_event() + assert result is False, f"Expected False but got {result}" + self.assertEqual(len(cm.output), 1) + self.assertIn("INVALID_VALUE is not a valid option for", cm.output[0]) + self.assertIn( + "Must be one of true or false (case-insensitive)", cm.output[0] + ) + + class TestTelemetryHandler(unittest.TestCase): def setUp(self): self.span_exporter = InMemorySpanExporter() @@ -183,28 +288,37 @@ def setUp(self): tracer_provider.add_span_processor( SimpleSpanProcessor(self.span_exporter) ) + self.log_exporter = InMemoryLogRecordExporter() + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(self.log_exporter) + ) self.telemetry_handler = get_telemetry_handler( - tracer_provider=tracer_provider + tracer_provider=tracer_provider, logger_provider=logger_provider ) def tearDown(self): # Clear spans and reset the singleton telemetry handler so each test starts clean self.span_exporter.clear() + self.log_exporter.clear() if hasattr(get_telemetry_handler, "_default_handler"): delattr(get_telemetry_handler, "_default_handler") @patch_env_vars( stability_mode="gen_ai_latest_experimental", content_capturing="SPAN_ONLY", + emit_event="", ) def test_llm_start_and_stop_creates_span(self): # pylint: disable=no-self-use message = _create_input_message("hello world") chat_generation = _create_output_message("hello back") + system_instruction = _create_system_instruction() with self.telemetry_handler.llm() as invocation: for attr, value in { "request_model": "test-model", "input_messages": [message], + "system_instruction": system_instruction, "provider": "test-provider", "attributes": {"custom_attr": "value"}, "temperature": 0.5, @@ -256,9 +370,19 @@ def test_llm_start_and_stop_creates_span(self): # pylint: disable=no-self-use self.assertEqual(invocation.attributes.get("custom_attr"), "value") self.assertEqual(invocation.attributes.get("extra"), "info") + # Verify system instruction is present in span as JSON string + self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, span_attrs) + span_system = json.loads(span_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS]) + self.assertIsInstance(span_system, list) + self.assertEqual( + span_system[0]["content"], "You are a helpful assistant." + ) + self.assertEqual(span_system[0]["type"], "text") + @patch_env_vars( stability_mode="gen_ai_latest_experimental", content_capturing="SPAN_ONLY", + emit_event="", ) def test_llm_manual_start_and_stop_creates_span(self): message = _create_input_message("hi") @@ -384,6 +508,7 @@ def test_llm_span_uses_expected_schema_url(self): @patch_env_vars( stability_mode="gen_ai_latest_experimental", content_capturing="SPAN_ONLY", + emit_event="", ) def test_parent_child_span_relationship(self): message = _create_input_message("hi") @@ -467,3 +592,230 @@ class BoomError(RuntimeError): GenAI.GEN_AI_USAGE_OUTPUT_TOKENS: 22, }, ) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="true", + ) + def test_emits_llm_event(self): + invocation = LLMInvocation( + request_model="event-model", + input_messages=[_create_input_message("test query")], + system_instruction=_create_system_instruction(), + provider="test-provider", + temperature=0.7, + max_tokens=100, + response_model_name="response-model", + response_id="event-response-id", + input_tokens=10, + output_tokens=20, + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [_create_output_message("test response")] + self.telemetry_handler.stop_llm(invocation) + + # Check that event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + + # Verify event name + self.assertEqual( + log_record.event_name, "gen_ai.client.inference.operation.details" + ) + + # Verify event attributes + attrs = log_record.attributes + self.assertIsNotNone(attrs) + self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "chat") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "event-model") + self.assertEqual(attrs[GenAI.GEN_AI_PROVIDER_NAME], "test-provider") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_TEMPERATURE], 0.7) + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MAX_TOKENS], 100) + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_MODEL], "response-model") + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_ID], "event-response-id") + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS], 10) + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS], 20) + + # Verify messages are in structured format (not JSON string) + # OpenTelemetry may convert lists to tuples, so we normalize + input_msg = _normalize_to_dict( + _normalize_to_list(attrs[GenAI.GEN_AI_INPUT_MESSAGES])[0] + ) + self.assertEqual(input_msg["role"], "Human") + self.assertEqual( + _normalize_to_list(input_msg["parts"])[0]["content"], "test query" + ) + + output_msg = _normalize_to_dict( + _normalize_to_list(attrs[GenAI.GEN_AI_OUTPUT_MESSAGES])[0] + ) + self.assertEqual(output_msg["role"], "AI") + self.assertEqual( + _normalize_to_list(output_msg["parts"])[0]["content"], + "test response", + ) + self.assertEqual(output_msg["finish_reason"], "stop") + + # Verify system instruction is present in event in structured format + sys_instr = _normalize_to_dict( + _normalize_to_list(attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS])[0] + ) + self.assertEqual(sys_instr["content"], "You are a helpful assistant.") + self.assertEqual(sys_instr["type"], "text") + + # Verify event context matches span context + span = _get_single_span(self.span_exporter) + self.assertIsNotNone(log_record.trace_id) + self.assertIsNotNone(log_record.span_id) + self.assertIsNotNone(span.context) + self.assertEqual(log_record.trace_id, span.context.trace_id) + self.assertEqual(log_record.span_id, span.context.span_id) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_AND_EVENT", + emit_event="true", + ) + def test_emits_llm_event_and_span(self): + message = _create_input_message("combined test") + chat_generation = _create_output_message("combined response") + system_instruction = _create_system_instruction("System prompt here") + + invocation = LLMInvocation( + request_model="combined-model", + input_messages=[message], + system_instruction=system_instruction, + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [chat_generation] + self.telemetry_handler.stop_llm(invocation) + + # Check span was created + span = _get_single_span(self.span_exporter) + span_attrs = _get_span_attributes(span) + self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, span_attrs) + + # Check event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + self.assertEqual( + log_record.event_name, "gen_ai.client.inference.operation.details" + ) + self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, log_record.attributes) + # Verify system instruction in both span and event + self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, span_attrs) + span_system = json.loads(span_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS]) + self.assertEqual(span_system[0]["content"], "System prompt here") + event_attrs = log_record.attributes + self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, event_attrs) + event_system = event_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] + event_system_list = ( + list(event_system) + if isinstance(event_system, tuple) + else event_system + ) + event_sys_instr = ( + dict(event_system_list[0]) + if isinstance(event_system_list[0], tuple) + else event_system_list[0] + ) + self.assertEqual(event_sys_instr["content"], "System prompt here") + # Verify event context matches span context + span = _get_single_span(self.span_exporter) + self.assertIsNotNone(log_record.trace_id) + self.assertIsNotNone(log_record.span_id) + self.assertIsNotNone(span.context) + self.assertEqual(log_record.trace_id, span.context.trace_id) + self.assertEqual(log_record.span_id, span.context.span_id) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="true", + ) + def test_emits_llm_event_with_error(self): + class TestError(RuntimeError): + pass + + message = _create_input_message("error test") + invocation = LLMInvocation( + request_model="error-model", + input_messages=[message], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + error = Error(message="Test error occurred", type=TestError) + self.telemetry_handler.fail_llm(invocation, error) + + # Check event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + attrs = log_record.attributes + + # Verify error attribute is present + self.assertEqual( + attrs[ErrorAttributes.ERROR_TYPE], TestError.__qualname__ + ) + self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "chat") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "error-model") + # Verify event context matches span context + span = _get_single_span(self.span_exporter) + self.assertIsNotNone(log_record.trace_id) + self.assertIsNotNone(log_record.span_id) + self.assertIsNotNone(span.context) + self.assertEqual(log_record.trace_id, span.context.trace_id) + self.assertEqual(log_record.span_id, span.context.span_id) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="false", + ) + def test_does_not_emit_llm_event_when_emit_event_false(self): + message = _create_input_message("emit false test") + chat_generation = _create_output_message("emit false response") + + invocation = LLMInvocation( + request_model="emit-false-model", + input_messages=[message], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [chat_generation] + self.telemetry_handler.stop_llm(invocation) + + # Check no event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 0) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="", + ) + def test_does_not_emit_llm_event_by_default(self): + """Test that event is not emitted by default when OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" + invocation = LLMInvocation( + request_model="default-model", + input_messages=[_create_input_message("default test")], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [ + _create_output_message("default response") + ] + self.telemetry_handler.stop_llm(invocation) + + # Check that no event was emitted (default behavior is false) + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 0)