diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py index bb8eb122..3204912b 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py @@ -15,6 +15,10 @@ from langchain_core.tracers import BaseTracer, LangChainTracer from langchain_core.tracers.schemas import Run +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_AGENT_NAME_KEY, + INVOKE_AGENT_OPERATION_NAME, +) from microsoft_agents_a365.observability.core.inference_operation_type import InferenceOperationType from microsoft_agents_a365.observability.core.utils import ( DictWithLock, @@ -37,11 +41,14 @@ function_calls, input_messages, invocation_parameters, + invoke_agent_input_message, + invoke_agent_output_message, llm_provider, metadata, model_name, output_messages, prompts, + set_execution_type, token_counts, tools, ) @@ -111,8 +118,17 @@ def _start_trace(self, run: Run) -> None: # We can't use real time because the handler may be # called in a background thread. start_time_utc_nano = as_utc_nano(run.start_time) + + # Determine span name based on run type + if run.run_type == "chain" and run.name == "LangGraph": + span_name = f"invoke_agent {run.name}" + elif run.run_type.lower() == "tool": + span_name = f"execute_tool {run.name}" + else: + span_name = run.name + span = self._tracer.start_span( - name=run.name, + name=span_name, context=parent_context, start_time=start_time_utc_nano, ) @@ -197,27 +213,52 @@ def _update_span(span: Span, run: Run) -> None: else: span.set_status(trace_api.Status(trace_api.StatusCode.ERROR, run.error)) + span.set_attributes(dict(get_attributes_from_context())) + if run.run_type == "llm" and run.outputs.get("llm_output").get("id").startswith("chat"): span.update_name(f"{InferenceOperationType.CHAT.value.lower()} {span.name}") - elif run.run_type.lower() == "tool": - span.update_name(f"execute_tool {span.name}") - span.set_attributes(dict(get_attributes_from_context())) - span.set_attributes( - dict( - flatten( - chain( - add_operation_type(run), - prompts(run.inputs), - input_messages(run.inputs), - output_messages(run.outputs), - invocation_parameters(run), - llm_provider(run.extra), - model_name(run.outputs, run.extra), - token_counts(run.outputs), - function_calls(run.outputs), - tools(run), - metadata(run), + is_invoke_agent = span.name.startswith(INVOKE_AGENT_OPERATION_NAME) + + # If this is an invoke_agent span, update span name with agent name + if is_invoke_agent: + agent_name = None + if hasattr(span, "_attributes") and span._attributes: + agent_name = span._attributes.get(GEN_AI_AGENT_NAME_KEY) + if agent_name: + span.update_name(f"{INVOKE_AGENT_OPERATION_NAME} {agent_name}") + + # For invoke_agent spans, add input/output messages + if is_invoke_agent: + span.set_attributes( + dict( + flatten( + chain( + set_execution_type(), + add_operation_type(run), + invoke_agent_input_message(run.inputs), + invoke_agent_output_message(run.outputs), + metadata(run), + ) + ) + ) + ) + else: + span.set_attributes( + dict( + flatten( + chain( + add_operation_type(run), + prompts(run.inputs), + input_messages(run.inputs), + output_messages(run.outputs), + invocation_parameters(run), + llm_provider(run.extra), + model_name(run.outputs, run.extra), + token_counts(run.outputs), + function_calls(run.outputs), + tools(run), + metadata(run), + ) ) ) ) - ) diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py index efb245ba..0b723bba 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py @@ -21,7 +21,7 @@ from microsoft_agents_a365.observability.extensions.langchain.tracer import CustomLangChainTracer -_INSTRUMENTS: str = "langchain_core >= 0.1.0" +_INSTRUMENTS: str = "langchain_core >= 1.2.0" class CustomLangChainInstrumentor(BaseInstrumentor): diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py index b7dfe638..b081e44e 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py @@ -9,6 +9,8 @@ from langchain_core.messages import BaseMessage from langchain_core.tracers.schemas import Run from microsoft_agents_a365.observability.core.constants import ( + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_EXECUTION_TYPE_KEY, GEN_AI_INPUT_MESSAGES_KEY, GEN_AI_OPERATION_NAME_KEY, GEN_AI_OUTPUT_MESSAGES_KEY, @@ -25,8 +27,10 @@ GEN_AI_TOOL_TYPE_KEY, GEN_AI_USAGE_INPUT_TOKENS_KEY, GEN_AI_USAGE_OUTPUT_TOKENS_KEY, + INVOKE_AGENT_OPERATION_NAME, SESSION_ID_KEY, ) +from microsoft_agents_a365.observability.core.execution_type import ExecutionType from microsoft_agents_a365.observability.core.inference_operation_type import InferenceOperationType from microsoft_agents_a365.observability.core.utils import ( get_first_value, @@ -199,8 +203,8 @@ def _parse_message_data(message_data: Mapping[str, Any] | None) -> Iterator[tupl @stop_on_exception def input_messages( inputs: Mapping[str, Any] | None, -) -> Iterator[tuple[str, list[dict[str, Any]]]]: - """Yields chat messages if present.""" +) -> Iterator[tuple[str, str]]: + """Yields chat messages as a JSON array of content strings.""" if not inputs: return assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" @@ -213,27 +217,29 @@ def input_messages( # This will only get the first set of messages. if not (first_messages := next(iter(multiple_messages), None)): return - parsed_messages = [] + contents: list[str] = [] if isinstance(first_messages, list): for message_data in first_messages: if isinstance(message_data, BaseMessage): - parsed_messages.append(dict(_parse_message_data(message_data.to_json()))) + if hasattr(message_data, "content") and message_data.content: + contents.append(str(message_data.content)) elif hasattr(message_data, "get"): - parsed_messages.append(dict(_parse_message_data(message_data))) - else: - raise ValueError(f"failed to parse message of type {type(message_data)}") + if content := message_data.get("content"): + contents.append(str(content)) + elif kwargs := message_data.get("kwargs"): + if hasattr(kwargs, "get") and (content := kwargs.get("content")): + contents.append(str(content)) elif isinstance(first_messages, BaseMessage): - parsed_messages.append(dict(_parse_message_data(first_messages.to_json()))) + if hasattr(first_messages, "content") and first_messages.content: + contents.append(str(first_messages.content)) elif hasattr(first_messages, "get"): - parsed_messages.append(dict(_parse_message_data(first_messages))) + if content := first_messages.get("content"): + contents.append(str(content)) elif isinstance(first_messages, Sequence) and len(first_messages) == 2: - # See e.g. https://github.com/langchain-ai/langchain/blob/18cf457eec106d99e0098b42712299f5d0daa798/libs/core/langchain_core/messages/utils.py#L317 # noqa: E501 role, content = first_messages - parsed_messages.append({"MESSAGE_ROLE": role, "MESSAGE_CONTENT": content}) - else: - raise ValueError(f"failed to parse messages of type {type(first_messages)}") - if parsed_messages: - yield GEN_AI_INPUT_MESSAGES_KEY, parsed_messages + contents.append(str(content)) + if contents: + yield GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(contents) @stop_on_exception @@ -255,8 +261,8 @@ def metadata(run: Run) -> Iterator[tuple[str, str]]: @stop_on_exception def output_messages( outputs: Mapping[str, Any] | None, -) -> Iterator[tuple[str, list[dict[str, Any]]]]: - """Yields chat messages if present.""" +) -> Iterator[tuple[str, str]]: + """Yields chat messages as a JSON array of content strings.""" if not outputs: return assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" @@ -279,18 +285,21 @@ def output_messages( assert isinstance(first_generations, Iterable), ( f"expected Iterable, found {type(first_generations)}" ) - parsed_messages = [] + contents: list[str] = [] for generation in first_generations: assert hasattr(generation, "get"), f"expected Mapping, found {type(generation)}" if message_data := generation.get("message"): if isinstance(message_data, BaseMessage): - parsed_messages.append(dict(_parse_message_data(message_data.to_json()))) + if hasattr(message_data, "content") and message_data.content: + contents.append(str(message_data.content)) elif hasattr(message_data, "get"): - parsed_messages.append(dict(_parse_message_data(message_data))) - else: - raise ValueError(f"fail to parse message of type {type(message_data)}") - if parsed_messages: - yield GEN_AI_OUTPUT_MESSAGES_KEY, parsed_messages + if content := message_data.get("content"): + contents.append(str(content)) + elif kwargs := message_data.get("kwargs"): + if hasattr(kwargs, "get") and (content := kwargs.get("content")): + contents.append(str(content)) + if contents: + yield GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(contents) @stop_on_exception @@ -305,7 +314,6 @@ def invocation_parameters(run: Run) -> Iterator[tuple[str, str]]: assert isinstance(invocation_parameters, Mapping), ( f"expected Mapping, found {type(invocation_parameters)}" ) - yield GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(invocation_parameters) tools = invocation_parameters.get("tools", []) for idx, tool in enumerate(tools): yield f"{GEN_AI_TOOL_ARGS_KEY}.{idx}", safe_json_dumps(tool) @@ -458,7 +466,7 @@ def function_calls(outputs: Mapping[str, Any] | None) -> Iterator[tuple[str, str return # Tool type (explicit) - yield GEN_AI_OPERATION_NAME_KEY, "execute_tool" + yield GEN_AI_OPERATION_NAME_KEY, EXECUTE_TOOL_OPERATION_NAME yield GEN_AI_TOOL_TYPE_KEY, "function" name = fc.get("name") @@ -505,6 +513,34 @@ def tools(run: Run) -> Iterator[tuple[str, str]]: if description := serialized.get("description"): yield GEN_AI_TOOL_DESCRIPTION_KEY, description + # Extract tool call ID from run.extra (LangGraph stores it there) + if run.extra and hasattr(run.extra, "get"): + if tool_call_id := run.extra.get("tool_call_id"): + yield GEN_AI_TOOL_CALL_ID_KEY, tool_call_id + + # Extract tool arguments from inputs + if run.inputs and hasattr(run.inputs, "get"): + # LangGraph wraps args in 'input' key as a string + if input_val := run.inputs.get("input"): + if isinstance(input_val, str): + yield GEN_AI_TOOL_ARGS_KEY, input_val + else: + yield GEN_AI_TOOL_ARGS_KEY, safe_json_dumps(input_val) + + # Extract tool result from outputs + if run.outputs and hasattr(run.outputs, "get"): + if result := run.outputs.get("output"): + # Handle ToolMessage or BaseMessage objects + if isinstance(result, BaseMessage): + result_content = result.content if hasattr(result, "content") else str(result) + elif hasattr(result, "content"): + result_content = result.content + elif isinstance(result, str): + result_content = result + else: + result_content = safe_json_dumps(result) + yield GEN_AI_TOOL_CALL_RESULT_KEY, result_content + def add_operation_type(run: Run) -> Iterator[tuple[str, str]]: """Yields operation type based on run type.""" @@ -512,6 +548,142 @@ def add_operation_type(run: Run) -> Iterator[tuple[str, str]]: if run_type == "llm": yield GEN_AI_OPERATION_NAME_KEY, InferenceOperationType.CHAT.value.lower() elif run_type == "chat_model": - yield GEN_AI_OPERATION_NAME_KEY, "chat" + yield GEN_AI_OPERATION_NAME_KEY, InferenceOperationType.CHAT.value.lower() elif run_type == "tool": - yield GEN_AI_OPERATION_NAME_KEY, "execute_tool" + yield GEN_AI_OPERATION_NAME_KEY, EXECUTE_TOOL_OPERATION_NAME + elif run_type == "chain" and run.name.startswith(INVOKE_AGENT_OPERATION_NAME): + yield GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME + + +def _extract_content_from_message(message: Any) -> str | None: + """Extract content from a LangChain message object or dict.""" + if message is None: + return None + + # Handle BaseMessage objects + if isinstance(message, BaseMessage): + return message.content if hasattr(message, "content") else None + + # Handle dict-like messages + if hasattr(message, "get"): + # Direct content field + if content := message.get("content"): + return content + # Nested in kwargs + if kwargs := message.get("kwargs"): + if hasattr(kwargs, "get") and (content := kwargs.get("content")): + return content + + return None + + +def _get_message_role(message: Any) -> str | None: + """Extract role from a LangChain message object or dict.""" + if message is None: + return None + + # Handle BaseMessage objects + if isinstance(message, BaseMessage): + return message.type if hasattr(message, "type") else None + + # Handle dict-like messages + if hasattr(message, "get"): + # Check various role indicators + if role := message.get("role"): + return role + if msg_type := message.get("type"): + return msg_type + # Check id field for type info (e.g., "HumanMessage", "AIMessage") + if id_field := message.get("id"): + if isinstance(id_field, list) and len(id_field) > 0: + type_name = id_field[-1] + if "Human" in type_name: + return "human" + elif "AI" in type_name or "Assistant" in type_name: + return "ai" + elif "System" in type_name: + return "system" + + return None + + +@stop_on_exception +def invoke_agent_input_message( + inputs: Mapping[str, Any] | None, +) -> Iterator[tuple[str, str]]: + """ + Extract the user input message for invoke_agent spans (LangGraph root). + We want to find the first user/human message content. + """ + if not inputs: + return + + assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" + + messages = inputs.get("messages") + if not messages: + return + + # Handle nested list structure: [[msg1, msg2, ...]] + if isinstance(messages, list) and len(messages) > 0: + first_item = messages[0] + # If first item is also a list, unwrap it + if isinstance(first_item, list): + messages = first_item + + # Find the first user/human message + if isinstance(messages, list): + for message in messages: + role = _get_message_role(message) + if role and role.lower() in ("human", "user"): + content = _extract_content_from_message(message) + if content: + yield GEN_AI_INPUT_MESSAGES_KEY, content + return + + # If no human message found, just get first message content + if len(messages) > 0: + content = _extract_content_from_message(messages[0]) + if content: + yield GEN_AI_INPUT_MESSAGES_KEY, content + + +@stop_on_exception +def invoke_agent_output_message( + outputs: Mapping[str, Any] | None, +) -> Iterator[tuple[str, str]]: + """ + Extract the final output message for invoke_agent spans (LangGraph root). + We want the last AI/assistant message content. + """ + if not outputs: + return + + assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" + + messages = outputs.get("messages") + if not messages: + return + + # Handle nested list structure if present + if isinstance(messages, list) and len(messages) > 0: + first_item = messages[0] + if isinstance(first_item, list): + messages = first_item + + # Find the last AI/assistant message with content (not tool calls) + if isinstance(messages, list): + # Iterate in reverse to find the last AI message + for message in reversed(messages): + role = _get_message_role(message) + if role and role.lower() in ("ai", "assistant"): + content = _extract_content_from_message(message) + # Make sure it has actual content, not just tool calls + if content and isinstance(content, str) and content.strip(): + yield GEN_AI_OUTPUT_MESSAGES_KEY, content + return + + +def set_execution_type() -> Iterator[tuple[str, str]]: + """Yields the execution type as human_to_agent.""" + yield GEN_AI_EXECUTION_TYPE_KEY, ExecutionType.HUMAN_TO_AGENT.value