From 2ba646fa155dc2201cebcddd673cd6618878a5c5 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Tue, 15 Jul 2025 16:46:59 -0700 Subject: [PATCH 1/7] xpander instrumentation --- agentops/instrumentation/__init__.py | 6 + .../agentic/xpander/__init__.py | 15 + .../agentic/xpander/instrumentor.py | 978 ++++++++++++++++++ .../agentic/xpander/trace_probe.py | 76 ++ .../agentic/xpander/version.py | 3 + 5 files changed, 1078 insertions(+) create mode 100644 agentops/instrumentation/agentic/xpander/__init__.py create mode 100644 agentops/instrumentation/agentic/xpander/instrumentor.py create mode 100644 agentops/instrumentation/agentic/xpander/trace_probe.py create mode 100644 agentops/instrumentation/agentic/xpander/version.py diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 1eb30d00b..b3cf1ad6d 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -112,6 +112,12 @@ class InstrumentorConfig(TypedDict): "class_name": "LanggraphInstrumentor", "min_version": "0.2.0", }, + "xpander_sdk": { + "module_name": "agentops.instrumentation.agentic.xpander", + "class_name": "XpanderInstrumentor", + "min_version": "1.0.0", + "package_name": "xpander-sdk", + }, } # Combine all target packages for monitoring diff --git a/agentops/instrumentation/agentic/xpander/__init__.py b/agentops/instrumentation/agentic/xpander/__init__.py new file mode 100644 index 000000000..8f1ab8cc0 --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/__init__.py @@ -0,0 +1,15 @@ +"""Xpander SDK instrumentation for AgentOps.""" + +from agentops.instrumentation.agentic.xpander.instrumentor import ( + XpanderInstrumentor, + wrap_openai_call_for_xpander, + is_xpander_session_active, + get_active_xpander_session +) + +__all__ = [ + "XpanderInstrumentor", + "wrap_openai_call_for_xpander", + "is_xpander_session_active", + "get_active_xpander_session" +] \ No newline at end of file diff --git a/agentops/instrumentation/agentic/xpander/instrumentor.py b/agentops/instrumentation/agentic/xpander/instrumentor.py new file mode 100644 index 000000000..bbe58a350 --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/instrumentor.py @@ -0,0 +1,978 @@ +"""Xpander SDK instrumentation for AgentOps. + +This module provides instrumentation for the Xpander SDK, which uses JSII to convert +TypeScript code to Python at runtime. The instrumentation tracks agent sessions, +tool executions, and LLM interactions. + +MODIFIED VERSION: Using existing AgentOps utilities where possible while keeping +runtime-specific instrumentation logic that cannot be replaced. + +REPLACEMENTS MADE: +✅ Span creation: Using tracer.make_span() instead of manual span creation +✅ Error handling: Using _finish_span_success/_finish_span_error utilities +✅ Attribute management: Using existing SpanAttributeManager +✅ Serialization: Using safe_serialize and model_to_dict utilities +✅ Attribute setting: Using _update_span utility + +RUNTIME-SPECIFIC LOGIC KEPT (Cannot be replaced): +❌ Method wrapping: Runtime method creation requires custom hooks +❌ Context persistence: XpanderContext must handle runtime object lifecycle +❌ Agent detection: Custom logic for dynamically created agents +""" + +import logging +import time +import threading +import json +from typing import Any, Dict, Optional +from opentelemetry.metrics import Meter +from opentelemetry.trace import SpanKind as OTelSpanKind +from opentelemetry import trace + +# Use existing AgentOps utilities +from agentops.instrumentation.common import ( + CommonInstrumentor, + InstrumentorConfig, + StandardMetrics, +) +from agentops.instrumentation.common.span_management import ( + SpanAttributeManager, + create_span +) +from agentops.instrumentation.common.wrappers import ( + _finish_span_success, + _finish_span_error, + _update_span +) +from agentops.instrumentation.common.attributes import ( + _extract_attributes_from_mapping, + get_common_attributes +) +from agentops.helpers.serialization import safe_serialize, model_to_dict +from agentops.sdk.core import tracer +from agentops.instrumentation.agentic.xpander.version import __version__ +from agentops.semconv import SpanAttributes, SpanKind, ToolAttributes +from agentops.semconv.message import MessageAttributes + +# Use existing OpenAI attribute extraction patterns (lazy import to avoid circular imports) +# from agentops.instrumentation.providers.openai.attributes.common import ( +# get_response_attributes, +# ) + +logger = logging.getLogger(__name__) + +_instruments = ("xpander-sdk >= 1.0.0",) + +# Use existing AgentOps utility instead of custom implementation +def safe_set_attribute(span, key: str, value: Any) -> None: + """Set attribute on span using existing AgentOps utility.""" + try: + _update_span(span, {key: value}) + except Exception as e: + logger.warning(f"Failed to set attribute {key}: {e}") + + +class XpanderContext: + """Context manager for Xpander sessions.""" + + def __init__(self): + self._sessions = {} # session_id -> session_data + self._workflow_spans = {} # session_id -> active workflow span + self._agent_spans = {} # session_id -> active agent span + self._lock = threading.Lock() + + def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_span=None, agent_span=None) -> None: + """Start a new session with agent info.""" + with self._lock: + self._sessions[session_id] = { + "agent_name": agent_info.get("agent_name", "unknown"), + "agent_id": agent_info.get("agent_id", "unknown"), + "task_input": agent_info.get("task_input"), + "phase": "planning", + "step_count": 0, + "total_tokens": 0, + "tools_executed": [], + "start_time": time.time(), + } + if workflow_span: + self._workflow_spans[session_id] = workflow_span + if agent_span: + self._agent_spans[session_id] = agent_span + + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get session data.""" + with self._lock: + return self._sessions.get(session_id) + + def update_session(self, session_id: str, updates: Dict[str, Any]) -> None: + """Update session data.""" + with self._lock: + if session_id in self._sessions: + self._sessions[session_id].update(updates) + + def end_session(self, session_id: str) -> None: + """End a session.""" + with self._lock: + if session_id in self._sessions: + del self._sessions[session_id] + if session_id in self._workflow_spans: + del self._workflow_spans[session_id] + if session_id in self._agent_spans: + del self._agent_spans[session_id] + + def get_workflow_phase(self, session_id: str) -> str: + """Detect current workflow phase based on state.""" + with self._lock: + session = self._sessions.get(session_id, {}) + + if session.get('tools_executed', []): + return "executing" + elif session.get('step_count', 0) > 0: + return "executing" + else: + return "planning" + + def get_workflow_span(self, session_id: str): + """Get the active workflow span for a session.""" + with self._lock: + return self._workflow_spans.get(session_id) + + def get_agent_span(self, session_id: str): + """Get the active agent span for a session.""" + with self._lock: + return self._agent_spans.get(session_id) + + +def extract_current_message_content(messages) -> str: + """Extract current response content using existing AgentOps serialization utilities.""" + if not messages: + return "" + + try: + # Use existing AgentOps serialization for consistent handling + if hasattr(messages, '__dict__'): + # Convert Pydantic/object to dict using existing utility + messages_dict = model_to_dict(messages) + else: + messages_dict = messages + + # Use safe_serialize for consistent string conversion + content = safe_serialize(messages_dict) + + # Apply consistent truncation following AgentOps patterns + max_length = 1500 + if len(content) > max_length: + content = content[:max_length - 3] + "..." + + return content + + except Exception as e: + logger.warning(f"Failed to extract message content: {e}") + # Fallback to safe string conversion + try: + return safe_serialize(messages)[:1000] + except Exception: + return str(messages)[:1000] if messages else "" + + +def clean_llm_content(content: str) -> str: + """Clean LLM content for display using robust, pattern-agnostic approach.""" + if not isinstance(content, str): + return str(content)[:1000] + + if not content or not content.strip(): + return "" + + # Basic content cleaning without hardcoded patterns + cleaned_content = content.strip() + + # Remove excessive whitespace and normalize line breaks + lines = [] + for line in cleaned_content.split('\n'): + line = line.strip() + if line: # Skip empty lines + lines.append(line) + + # Join with single newlines + cleaned_content = '\n'.join(lines) + + # General heuristic: if content is very long and contains structured data patterns, + # try to extract the main response by taking the first substantial paragraph + if len(cleaned_content) > 3000: + paragraphs = cleaned_content.split('\n\n') + if len(paragraphs) > 1: + # Find the first substantial paragraph (more than 50 chars) + for paragraph in paragraphs: + if len(paragraph.strip()) > 50: + cleaned_content = paragraph.strip() + break + + # Apply consistent truncation with ellipsis (following AgentOps patterns) + max_length = 1500 + if len(cleaned_content) > max_length: + cleaned_content = cleaned_content[:max_length - 3] + "..." + + return cleaned_content + + + + +class XpanderInstrumentor(CommonInstrumentor): + """Instrumentor for Xpander SDK interactions.""" + + def __init__(self, config: Optional[InstrumentorConfig] = None): + if config is None: + config = InstrumentorConfig( + library_name="xpander-sdk", + library_version="1.0.0", + dependencies=_instruments, + metrics_enabled=True + ) + super().__init__(config) + self._context = XpanderContext() + self._tracer = None + # Use existing AgentOps attribute manager + self._attribute_manager = SpanAttributeManager("xpander-service", "production") + + def _get_session_id_from_agent(self, agent) -> str: + """Generate consistent session ID from agent.""" + agent_name = getattr(agent, 'name', 'unknown') + agent_id = getattr(agent, 'id', 'unknown') + return f"agent_{agent_name}_{agent_id}" + + def _extract_session_id(self, execution, agent=None) -> str: + """Extract session ID from execution data.""" + if isinstance(execution, dict): + if 'thread_id' in execution: + return f"session_{execution['thread_id']}" + elif 'session_id' in execution: + return f"session_{execution['session_id']}" + + # Fallback to timestamp + return f"session_{int(time.time())}" + + def _extract_tool_name(self, tool_call) -> str: + """Extract tool name from tool call.""" + # Handle different tool call formats + if hasattr(tool_call, 'function_name'): + return tool_call.function_name + elif hasattr(tool_call, 'function') and hasattr(tool_call.function, 'name'): + return tool_call.function.name + elif hasattr(tool_call, 'name'): + return tool_call.name + elif isinstance(tool_call, dict): + if 'function' in tool_call: + return tool_call['function'].get('name', 'unknown') + elif 'function_name' in tool_call: + return tool_call['function_name'] + elif 'name' in tool_call: + return tool_call['name'] + + # Try to extract from string representation + import re + patterns = [ + r'function[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', + r'name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', + r'([a-zA-Z_][a-zA-Z0-9_]*)\.tool', + r'function_name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]' + ] + + tool_str = str(tool_call) + for pattern in patterns: + match = re.search(pattern, tool_str, re.IGNORECASE) + if match: + return match.group(1) + + return 'unknown' + + def _extract_tool_params(self, tool_call) -> dict: + """Extract tool parameters from tool call.""" + # Handle different parameter formats + if hasattr(tool_call, 'function') and hasattr(tool_call.function, 'arguments'): + try: + args = tool_call.function.arguments + if isinstance(args, str): + return json.loads(args) + elif isinstance(args, dict): + return args + except (json.JSONDecodeError, AttributeError): + pass + elif hasattr(tool_call, 'arguments'): + try: + args = tool_call.arguments + if isinstance(args, str): + return json.loads(args) + elif isinstance(args, dict): + return args + except (json.JSONDecodeError, AttributeError): + pass + elif isinstance(tool_call, dict): + if 'function' in tool_call: + args = tool_call['function'].get('arguments', '{}') + try: + return json.loads(args) if isinstance(args, str) else args + except json.JSONDecodeError: + pass + elif 'arguments' in tool_call: + args = tool_call['arguments'] + try: + return json.loads(args) if isinstance(args, str) else args + except json.JSONDecodeError: + pass + + return {} + + def _extract_llm_data_from_messages(self, messages) -> dict: + """Extract LLM metadata from messages.""" + data = {} + + if isinstance(messages, dict): + # Direct model and usage fields + if 'model' in messages: + data['model'] = messages['model'] + if 'usage' in messages: + data['usage'] = messages['usage'] + + # Check in choices array (OpenAI format) + if 'choices' in messages and messages['choices']: + choice = messages['choices'][0] + if 'message' in choice: + message = choice['message'] + if 'model' in message: + data['model'] = message['model'] + + elif isinstance(messages, list): + # Look for assistant messages with metadata + for msg in messages: + if isinstance(msg, dict) and msg.get('role') == 'assistant': + if 'model' in msg: + data['model'] = msg['model'] + if 'usage' in msg: + data['usage'] = msg['usage'] + break + + # Try to extract from any nested structures + if not data and hasattr(messages, '__dict__'): + msg_dict = messages.__dict__ + if 'model' in msg_dict: + data['model'] = msg_dict['model'] + if 'usage' in msg_dict: + data['usage'] = msg_dict['usage'] + + return data + + def _extract_and_set_openai_message_attributes(self, span, messages, result, agent=None): + """Extract and set OpenAI message attributes from messages and response.""" + try: + # Lazy import to avoid circular imports + try: + from agentops.instrumentation.providers.openai.attributes.common import get_response_attributes + except ImportError: + logger.debug("OpenAI attributes module not available, using fallback") + # Continue with manual extraction as fallback + # Try to get the agent's current message history for prompts + agent_messages = [] + if agent and hasattr(agent, 'messages'): + agent_messages = getattr(agent, 'messages', []) + elif agent and hasattr(agent, 'conversation_history'): + agent_messages = getattr(agent, 'conversation_history', []) + elif agent and hasattr(agent, 'history'): + agent_messages = getattr(agent, 'history', []) + + # Also try to extract messages from the messages parameter itself + if isinstance(messages, list): + # If messages is a list of messages, use it directly + agent_messages.extend(messages) + elif isinstance(messages, dict) and 'messages' in messages: + # If messages contains a messages key + agent_messages.extend(messages.get('messages', [])) + + # Set prompt messages (input to LLM) + prompt_index = 0 + for msg in agent_messages[-10:]: # Get last 10 messages to avoid huge context + if isinstance(msg, dict): + role = msg.get('role', 'user') + content = msg.get('content', '') + + # Handle different content formats + if content and isinstance(content, str) and content.strip(): + safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) + safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content[:2000]) + prompt_index += 1 + elif content and isinstance(content, list): + # Handle multi-modal content + content_str = str(content)[:2000] + safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) + safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content_str) + prompt_index += 1 + elif hasattr(msg, 'content'): + # Handle object with content attribute + content = getattr(msg, 'content', '') + role = getattr(msg, 'role', 'user') + if content and isinstance(content, str) and content.strip(): + safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) + safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), str(content)[:2000]) + prompt_index += 1 + + # Set completion messages (response from LLM) + completion_index = 0 + response_data = result if result else messages + + # Handle different response formats + if isinstance(response_data, dict): + choices = response_data.get('choices', []) + for choice in choices: + message = choice.get('message', {}) + role = message.get('role', 'assistant') + content = message.get('content', '') + + if content: + safe_set_attribute(span, MessageAttributes.COMPLETION_ROLE.format(i=completion_index), role) + safe_set_attribute(span, MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), content[:2000]) + + # Handle tool calls in the response + tool_calls = message.get('tool_calls', []) + for j, tool_call in enumerate(tool_calls): + tool_id = tool_call.get('id', '') + tool_name = tool_call.get('function', {}).get('name', '') + tool_args = tool_call.get('function', {}).get('arguments', '') + + if tool_id: + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), tool_id) + if tool_name: + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), tool_name) + if tool_args: + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=completion_index, j=j), tool_args[:500]) + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), "function") + + completion_index += 1 + elif hasattr(response_data, 'choices'): + # Handle response object with choices attribute + choices = getattr(response_data, 'choices', []) + for choice in choices: + message = getattr(choice, 'message', None) + if message: + role = getattr(message, 'role', 'assistant') + content = getattr(message, 'content', '') + + if content: + safe_set_attribute(span, MessageAttributes.COMPLETION_ROLE.format(i=completion_index), role) + safe_set_attribute(span, MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), str(content)[:2000]) + + # Handle tool calls + tool_calls = getattr(message, 'tool_calls', []) + for j, tool_call in enumerate(tool_calls): + tool_id = getattr(tool_call, 'id', '') + function = getattr(tool_call, 'function', None) + if function: + tool_name = getattr(function, 'name', '') + tool_args = getattr(function, 'arguments', '') + + if tool_id: + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), tool_id) + if tool_name: + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), tool_name) + if tool_args: + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=completion_index, j=j), str(tool_args)[:500]) + safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), "function") + + completion_index += 1 + + + except Exception as e: + logger.error(f"Error extracting OpenAI message attributes: {e}") + + def _wrap_init_task(self, original_method): + """Wrap init_task to create agent span hierarchy.""" + instrumentor = self + def wrapper(self, execution): + + # Extract session ID and agent info + session_id = instrumentor._extract_session_id(execution) + agent_name = getattr(self, 'name', 'unknown') + agent_id = getattr(self, 'id', 'unknown') + + # Extract task input + task_input = None + if isinstance(execution, dict): + if 'input' in execution: + input_data = execution['input'] + if isinstance(input_data, dict) and 'text' in input_data: + task_input = input_data['text'] + elif isinstance(input_data, str): + task_input = input_data + + # Create top-level agent span using existing AgentOps utility + agent_span_attributes = { + SpanAttributes.AGENTOPS_ENTITY_NAME: agent_name, + "xpander.span.type": "agent", + "xpander.agent.name": agent_name, + "xpander.agent.id": agent_id, + "xpander.session.id": session_id, + "agent.name": agent_name, + "agent.id": agent_id, + } + agent_span, agent_ctx, agent_token = tracer.make_span( + operation_name=agent_name, + span_kind=SpanKind.AGENT, + attributes=agent_span_attributes + ) + + + # Set task input on agent span + if task_input: + safe_set_attribute(agent_span, SpanAttributes.AGENTOPS_ENTITY_INPUT, task_input[:1000]) + safe_set_attribute(agent_span, "xpander.agent.task_input", task_input[:500]) + + # Create workflow span as child of agent span using existing utility + workflow_span_attributes = { + "xpander.span.type": "workflow", + "xpander.workflow.phase": "planning", + "xpander.agent.name": agent_name, + "xpander.agent.id": agent_id, + "xpander.session.id": session_id, + } + # Set agent span as current context for proper nesting + trace.set_span_in_context(agent_span) + workflow_span, workflow_ctx, workflow_token = tracer.make_span( + operation_name=f"workflow.{agent_name}", + span_kind=SpanKind.WORKFLOW, + attributes=workflow_span_attributes + ) + + # Initialize workflow state with persistent spans + agent_info = { + "agent_name": agent_name, + "agent_id": agent_id, + "task_input": task_input, + "thread_id": execution.get('thread_id') if isinstance(execution, dict) else None + } + instrumentor._context.start_session(session_id, agent_info, workflow_span, agent_span) + + + try: + # Execute original method - don't end agent span here, it will be ended in retrieve_execution_result + result = original_method(self, execution) + return result + except Exception as e: + # Use existing AgentOps error handling utilities + _finish_span_error(workflow_span, e) + _finish_span_error(agent_span, e) + raise + + return wrapper + + def _wrap_run_tools(self, original_method): + """Wrap run_tools to create execution phase tool spans.""" + instrumentor = self + def wrapper(self, tool_calls, payload_extension=None): + + session_id = instrumentor._get_session_id_from_agent(self) + current_session = instrumentor._context.get_session(session_id) + + # Update workflow state + step_num = (current_session.get('step_count', 0) + 1) if current_session else 1 + instrumentor._context.update_session(session_id, { + 'step_count': step_num, + 'phase': 'executing', + 'tools_executed': (current_session.get('tools_executed', []) if current_session else []) + + [instrumentor._extract_tool_name(tc) for tc in tool_calls] + }) + + # Get current span context (should be the LLM span) + current_span = trace.get_current_span() + + # Create execution phase span as child of current LLM span + execution_span_context = trace.set_span_in_context(current_span) if current_span else None + + with instrumentor._tracer.start_as_current_span( + f"xpander.execution", + kind=OTelSpanKind.INTERNAL, + context=execution_span_context, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.TASK, + "xpander.span.type": "execution", + "xpander.workflow.phase": "executing", + "xpander.step.number": step_num, + "xpander.step.tool_count": len(tool_calls), + "xpander.session.id": session_id, + } + ) as execution_span: + + # Execute tools and create individual tool spans + results = [] + for i, tool_call in enumerate(tool_calls): + tool_name = instrumentor._extract_tool_name(tool_call) + tool_params = instrumentor._extract_tool_params(tool_call) + + start_time = time.time() + + # Create tool span as child of execution span + tool_span_context = trace.set_span_in_context(execution_span) + + with instrumentor._tracer.start_as_current_span( + f"tool.{tool_name}", + kind=OTelSpanKind.CLIENT, + context=tool_span_context, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.TOOL, + ToolAttributes.TOOL_NAME: tool_name, + ToolAttributes.TOOL_PARAMETERS: str(tool_params)[:500], + "xpander.span.type": "tool", + "xpander.workflow.phase": "executing", + "xpander.tool.step": step_num, + "xpander.tool.index": i, + } + ) as tool_span: + + + # Execute single tool + single_result = original_method(self, [tool_call], payload_extension) + results.extend(single_result) + + # Record tool execution details + execution_time = time.time() - start_time + safe_set_attribute(tool_span, "xpander.tool.execution_time", execution_time) + + # Add tool result if available + if single_result: + result_summary = f"Executed successfully with {len(single_result)} results" + safe_set_attribute(tool_span, "xpander.tool.result_summary", result_summary) + + # Store actual result data using existing AgentOps utilities + try: + result_content = "" + + for i, result_item in enumerate(single_result): + # Handle xpander_sdk.ToolCallResult objects specifically + if hasattr(result_item, '__class__') and 'ToolCallResult' in str(type(result_item)): + # Extract the actual result content from ToolCallResult + try: + if hasattr(result_item, 'result') and result_item.result is not None: + actual_result = result_item.result + if isinstance(actual_result, str): + result_content += actual_result[:1000] + "\n" + else: + result_content += safe_serialize(actual_result)[:1000] + "\n" + elif hasattr(result_item, 'data') and result_item.data is not None: + result_content += safe_serialize(result_item.data)[:1000] + "\n" + else: + # Fallback: try to find any content attribute + for attr_name in ['content', 'output', 'value', 'response']: + if hasattr(result_item, attr_name): + attr_value = getattr(result_item, attr_name) + if attr_value is not None: + result_content += safe_serialize(attr_value)[:1000] + "\n" + break + else: + # If no content attributes found, indicate this + result_content += f"ToolCallResult object (no extractable content)\n" + except Exception as attr_e: + logger.debug(f"Error extracting from ToolCallResult: {attr_e}") + result_content += f"ToolCallResult object (extraction failed)\n" + + # Handle regular objects and primitives + elif isinstance(result_item, (str, int, float, bool)): + result_content += str(result_item)[:1000] + "\n" + elif hasattr(result_item, '__dict__'): + # Convert objects to dict using existing utility + result_dict = model_to_dict(result_item) + result_content += safe_serialize(result_dict)[:1000] + "\n" + else: + # Use safe_serialize for consistent conversion + result_content += safe_serialize(result_item)[:1000] + "\n" + + if result_content.strip(): + final_content = result_content.strip()[:2000] + safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, final_content) + else: + safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, "No extractable content found") + + except Exception as e: + logger.error(f"Error setting tool result: {e}") + safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, f"Error capturing result: {e}") + else: + safe_set_attribute(tool_span, "xpander.tool.result_summary", "No results returned") + + return results + + return wrapper + + def _wrap_add_messages(self, original_method): + """Wrap add_messages to create LLM spans with proper parent-child relationship.""" + instrumentor = self + def wrapper(self, messages): + + session_id = instrumentor._get_session_id_from_agent(self) + current_session = instrumentor._context.get_session(session_id) + current_phase = instrumentor._context.get_workflow_phase(session_id) + workflow_span = instrumentor._context.get_workflow_span(session_id) + + # Extract and clean message content + message_content = extract_current_message_content(messages) + + + # Create LLM span as child of workflow span + llm_span_context = trace.set_span_in_context(workflow_span) if workflow_span else None + + # Call original method first to get the actual OpenAI response + result = original_method(self, messages) + + # Now create a span that captures the LLM interaction with the actual response data + with instrumentor._tracer.start_as_current_span( + f"llm.{current_phase}", + kind=OTelSpanKind.CLIENT, + context=llm_span_context, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.LLM, + "xpander.span.type": "llm", + "xpander.workflow.phase": current_phase, + "xpander.session.id": session_id, + }, + ) as llm_span: + + # Extract and set OpenAI message data from the messages and response + instrumentor._extract_and_set_openai_message_attributes(llm_span, messages, result, self) + + # Set cleaned content for legacy compatibility + if message_content: + cleaned_content = clean_llm_content(message_content) + safe_set_attribute(llm_span, "xpander.llm.content", cleaned_content) + + # Extract and set LLM metadata from the result if possible + llm_data = instrumentor._extract_llm_data_from_messages(result if result else messages) + if llm_data: + if 'model' in llm_data: + safe_set_attribute(llm_span, SpanAttributes.LLM_REQUEST_MODEL, llm_data['model']) + safe_set_attribute(llm_span, SpanAttributes.LLM_RESPONSE_MODEL, llm_data['model']) + + if 'usage' in llm_data: + usage = llm_data['usage'] + if 'prompt_tokens' in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage['prompt_tokens']) + if 'completion_tokens' in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage['completion_tokens']) + if 'total_tokens' in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage['total_tokens']) + # Update workflow state + instrumentor._context.update_session(session_id, { + "total_tokens": (current_session.get("total_tokens", 0) if current_session else 0) + usage['total_tokens'] + }) + + return result + + return wrapper + + def _wrap_is_finished(self, original_method): + """Wrap is_finished to track workflow completion.""" + instrumentor = self + def wrapper(self): + result = original_method(self) + + if result: + session_id = instrumentor._get_session_id_from_agent(self) + + # Update session to finished state + instrumentor._context.update_session(session_id, { + "phase": "finished", + "end_time": time.time() + }) + + return result + + return wrapper + + def _wrap_extract_tool_calls(self, original_method): + """Wrap extract_tool_calls to track tool planning.""" + instrumentor = self + def wrapper(self, messages): + result = original_method(self, messages) + + if result: + session_id = instrumentor._get_session_id_from_agent(self) + + return result + + return wrapper + + def _wrap_report_execution_metrics(self, original_method): + """Wrap report_execution_metrics to track metrics.""" + instrumentor = self + def wrapper(self, llm_tokens=None, ai_model=None): + result = original_method(self, llm_tokens, ai_model) + + session_id = instrumentor._get_session_id_from_agent(self) + + return result + + return wrapper + + def _wrap_retrieve_execution_result(self, original_method): + """Wrap retrieve_execution_result to finalize agent and workflow spans.""" + instrumentor = self + def wrapper(self): + session_id = instrumentor._get_session_id_from_agent(self) + current_session = instrumentor._context.get_session(session_id) + workflow_span = instrumentor._context.get_workflow_span(session_id) + agent_span = instrumentor._context.get_agent_span(session_id) + + try: + # Execute and capture result + result = original_method(self) + + # Add workflow summary to the persistent workflow span + if workflow_span and current_session: + safe_set_attribute(workflow_span, "xpander.workflow.total_steps", current_session.get('step_count', 0)) + safe_set_attribute(workflow_span, "xpander.workflow.total_tokens", current_session.get('total_tokens', 0)) + safe_set_attribute(workflow_span, "xpander.workflow.tools_used", len(current_session.get('tools_executed', []))) + + # Calculate total execution time + start_time = current_session.get('start_time', time.time()) + execution_time = time.time() - start_time + safe_set_attribute(workflow_span, "xpander.workflow.execution_time", execution_time) + safe_set_attribute(workflow_span, "xpander.workflow.phase", "completed") + + # Set result details on both workflow and agent spans + if result: + result_content = "" + if hasattr(result, 'result'): + result_content = str(result.result)[:1000] + + if workflow_span: + if result_content: + safe_set_attribute(workflow_span, "xpander.result.content", result_content) + if hasattr(result, 'memory_thread_id'): + safe_set_attribute(workflow_span, "xpander.result.thread_id", result.memory_thread_id) + + if agent_span: + if result_content: + safe_set_attribute(agent_span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, result_content) + safe_set_attribute(agent_span, "xpander.agent.final_result", result_content) + if hasattr(result, 'memory_thread_id'): + safe_set_attribute(agent_span, "xpander.agent.thread_id", result.memory_thread_id) + + # Add agent summary + if current_session: + safe_set_attribute(agent_span, "xpander.agent.total_steps", current_session.get('step_count', 0)) + safe_set_attribute(agent_span, "xpander.agent.total_tokens", current_session.get('total_tokens', 0)) + safe_set_attribute(agent_span, "xpander.agent.tools_used", len(current_session.get('tools_executed', []))) + + start_time = current_session.get('start_time', time.time()) + execution_time = time.time() - start_time + safe_set_attribute(agent_span, "xpander.agent.execution_time", execution_time) + + + + # Mark spans as successful and close them using existing utilities + if workflow_span: + _finish_span_success(workflow_span) + workflow_span.end() + + if agent_span: + _finish_span_success(agent_span) + agent_span.end() + + return result + + except Exception as e: + # Mark spans as failed and close them using existing utilities + if workflow_span: + _finish_span_error(workflow_span, e) + workflow_span.end() + + if agent_span: + _finish_span_error(agent_span, e) + agent_span.end() + + raise + finally: + # Clean up session + instrumentor._context.end_session(session_id) + + return wrapper + + def _instrument(self, **kwargs): + """Instrument the Xpander SDK.""" + try: + # Import xpander modules + from xpander_sdk import Agent + + # Set up tracing using existing AgentOps tracer + self._tracer = tracer.get_tracer() + # Attribute manager already initialized in __init__ + + # Wrap Agent methods + Agent.add_task = self._wrap_init_task(Agent.add_task) + Agent.init_task = self._wrap_init_task(Agent.init_task) # Also wrap init_task for completeness + Agent.run_tools = self._wrap_run_tools(Agent.run_tools) + Agent.add_messages = self._wrap_add_messages(Agent.add_messages) + Agent.is_finished = self._wrap_is_finished(Agent.is_finished) + Agent.extract_tool_calls = self._wrap_extract_tool_calls(Agent.extract_tool_calls) + Agent.report_execution_metrics = self._wrap_report_execution_metrics(Agent.report_execution_metrics) + Agent.retrieve_execution_result = self._wrap_retrieve_execution_result(Agent.retrieve_execution_result) + + logger.info("Xpander SDK instrumentation activated") + + except ImportError: + logger.debug("Xpander SDK not available") + except Exception as e: + logger.error(f"Failed to instrument Xpander SDK: {e}") + + def _uninstrument(self, **kwargs): + """Uninstrument the Xpander SDK.""" + logger.info("Xpander SDK instrumentation deactivated") + + def _create_metrics(self, meter: Meter) -> StandardMetrics: + """Create metrics for Xpander instrumentation.""" + return StandardMetrics( + requests_active=meter.create_up_down_counter( + name="xpander_requests_active", + description="Number of active Xpander requests", + ), + requests_duration=meter.create_histogram( + name="xpander_requests_duration", + description="Duration of Xpander requests", + unit="s", + ), + requests_total=meter.create_counter( + name="xpander_requests_total", + description="Total number of Xpander requests", + ), + requests_error=meter.create_counter( + name="xpander_requests_error", + description="Number of Xpander request errors", + ), + ) + + +def create_xpander_llm_span(tracer, model_name, purpose, session_id, attribute_manager=None): + """Create a standardized LLM span for Xpander operations.""" + span = tracer.start_span( + f"xpander.llm.{purpose}", + kind=OTelSpanKind.CLIENT, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.LLM, + SpanAttributes.LLM_REQUEST_MODEL: model_name, + SpanAttributes.LLM_RESPONSE_MODEL: model_name, + "xpander.span.type": "llm", + "xpander.llm.purpose": purpose, + "xpander.session.id": session_id, + } + ) + if attribute_manager: + attribute_manager.set_common_attributes(span) + return span + + +# Backward compatibility functions +def wrap_openai_call_for_xpander(openai_call_func, purpose="general"): + """Backward compatibility stub - functionality now handled by auto-instrumentation.""" + return openai_call_func + +def is_xpander_session_active(): + """Check if xpander session is active.""" + return True # Always return True since auto-instrumentation is active + +def get_active_xpander_session(): + """Get active xpander session.""" + return None # Return None since we don't expose internal context diff --git a/agentops/instrumentation/agentic/xpander/trace_probe.py b/agentops/instrumentation/agentic/xpander/trace_probe.py new file mode 100644 index 000000000..d2f6b408b --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/trace_probe.py @@ -0,0 +1,76 @@ +"""Xpander trace probe for automatic instrumentation activation. + +This module provides automatic instrumentation for Xpander SDK when imported. +It should be imported early in the application lifecycle to ensure all +Xpander interactions are captured. +""" + +import logging +from agentops.instrumentation.agentic.xpander.instrumentor import XpanderInstrumentor + +logger = logging.getLogger(__name__) + +# Global instrumentor instance +_instrumentor = None + +def activate_xpander_instrumentation(): + """Activate Xpander instrumentation.""" + global _instrumentor + + if _instrumentor is None: + try: + _instrumentor = XpanderInstrumentor() + _instrumentor.instrument() + logger.info("Xpander instrumentation activated successfully") + except Exception as e: + logger.error(f"Failed to activate Xpander instrumentation: {e}") + _instrumentor = None + + return _instrumentor + +def deactivate_xpander_instrumentation(): + """Deactivate Xpander instrumentation.""" + global _instrumentor + + if _instrumentor is not None: + try: + _instrumentor.uninstrument() + logger.info("Xpander instrumentation deactivated successfully") + except Exception as e: + logger.error(f"Failed to deactivate Xpander instrumentation: {e}") + finally: + _instrumentor = None + +def get_instrumentor(): + """Get the active instrumentor instance.""" + return _instrumentor + +# Stub functions for backward compatibility +def wrap_openai_call_for_xpander(openai_call_func, purpose="general"): + """Backward compatibility stub - functionality now handled by auto-instrumentation.""" + logger.debug(f"wrap_openai_call_for_xpander called with purpose: {purpose}") + return openai_call_func + +def is_xpander_session_active(): + """Check if xpander session is active.""" + return _instrumentor is not None + +def get_active_xpander_session(): + """Get active xpander session.""" + return _instrumentor._context if _instrumentor else None + +# Convenience functions for cleaner OpenAI integration +def wrap_openai_analysis(openai_call_func): + """Wrap OpenAI calls for analysis/reasoning steps.""" + return wrap_openai_call_for_xpander(openai_call_func, "analysis") + +def wrap_openai_planning(openai_call_func): + """Wrap OpenAI calls for planning steps.""" + return wrap_openai_call_for_xpander(openai_call_func, "planning") + +def wrap_openai_synthesis(openai_call_func): + """Wrap OpenAI calls for synthesis/summary steps.""" + return wrap_openai_call_for_xpander(openai_call_func, "synthesis") + +# Note: Auto-activation is now handled by the main AgentOps instrumentation system +# activate_xpander_instrumentation() \ No newline at end of file diff --git a/agentops/instrumentation/agentic/xpander/version.py b/agentops/instrumentation/agentic/xpander/version.py new file mode 100644 index 000000000..acc10fa75 --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/version.py @@ -0,0 +1,3 @@ +"""Version information for xpander instrumentation.""" + +__version__ = "1.0.0" \ No newline at end of file From 4877a60c64438dbabedb6eea1084b0cb92f73fc9 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Tue, 15 Jul 2025 17:33:59 -0700 Subject: [PATCH 2/7] add xpander docs --- docs/mint.json | 39 ++++-- docs/v2/examples/xpander.mdx | 14 ++ docs/v2/integrations/xpander.mdx | 191 ++++++++++++++++++++++++++++ examples/xpander/coding_agent.ipynb | 1 + 4 files changed, 236 insertions(+), 9 deletions(-) create mode 100644 docs/v2/examples/xpander.mdx create mode 100644 docs/v2/integrations/xpander.mdx create mode 100644 examples/xpander/coding_agent.ipynb diff --git a/docs/mint.json b/docs/mint.json index e26bc058c..37b98eb8f 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -28,16 +28,25 @@ "icon": "lines-leaning" }, "anchors": [], - "versions": ["v2", "v1", "v0"], + "versions": [ + "v2", + "v1", + "v0" + ], "navigation": [ { "group": "", - "pages": ["v1/introduction"], + "pages": [ + "v1/introduction" + ], "version": "v1" }, { "group": "Getting Started", - "pages": ["v1/quickstart", "v1/examples/examples"], + "pages": [ + "v1/quickstart", + "v1/examples/examples" + ], "version": "v1" }, { @@ -94,17 +103,26 @@ }, { "group": "Other Info", - "pages": ["v1/concepts/host-env"], + "pages": [ + "v1/concepts/host-env" + ], "version": "v1" }, { "group": "", - "pages": ["v2/introduction"], + "pages": [ + "v2/introduction" + ], "version": "v2" }, { "group": "Getting Started", - "pages": ["v2/quickstart", "v2/examples/examples", "v2/usage/mcp-docs", "v2/usage/mcp-server"], + "pages": [ + "v2/quickstart", + "v2/examples/examples", + "v2/usage/mcp-docs", + "v2/usage/mcp-server" + ], "version": "v2" }, { @@ -138,7 +156,8 @@ "v2/integrations/openai_agents_js", "v2/integrations/smolagents", "v2/integrations/ibm_watsonx_ai", - "v2/integrations/xai" + "v2/integrations/xai", + "v2/integrations/xpander" ], "version": "v2" }, @@ -161,7 +180,9 @@ }, { "group": "Other Info", - "pages": ["v2/concepts/host-env"], + "pages": [ + "v2/concepts/host-env" + ], "version": "v2" } ], @@ -176,4 +197,4 @@ "apiHost": "https://us.i.posthog.com" } } -} +} \ No newline at end of file diff --git a/docs/v2/examples/xpander.mdx b/docs/v2/examples/xpander.mdx new file mode 100644 index 000000000..e1034688b --- /dev/null +++ b/docs/v2/examples/xpander.mdx @@ -0,0 +1,14 @@ +--- +title: 'Xpander' +description: 'Xpander example using AgentOps' +--- +{/* SOURCE_FILE: examples/xpander/coding_agent.ipynb */} + +_View Notebook on Github_ + + + + + + + \ No newline at end of file diff --git a/docs/v2/integrations/xpander.mdx b/docs/v2/integrations/xpander.mdx new file mode 100644 index 000000000..9c478b85a --- /dev/null +++ b/docs/v2/integrations/xpander.mdx @@ -0,0 +1,191 @@ +--- +title: 'Xpander' +description: 'Monitor and analyze your Xpander agent workflows with automatic AgentOps instrumentation' +--- + +[Xpander](https://xpander.ai/) is a powerful platform for building and deploying AI agents with sophisticated workflow management capabilities. AgentOps provides seamless integration with the Xpander SDK, automatically instrumenting all agent activities, tool executions, and LLM interactions without any manual setup. + +## Installation + +Install AgentOps and the Xpander SDK, along with the required dependencies: + + + ```bash pip + pip install agentops xpander-sdk openai python-dotenv + ``` + ```bash poetry + poetry add agentops xpander-sdk openai python-dotenv + ``` + ```bash uv + uv add agentops xpander-sdk openai python-dotenv + ``` + + +## Setting Up API Keys + +You'll need API keys for AgentOps, Xpander, and OpenAI: +- **AGENTOPS_API_KEY**: From your [AgentOps Dashboard](https://app.agentops.ai/) +- **XPANDER_API_KEY**: From your [Xpander Dashboard](https://app.xpander.ai/) +- **XPANDER_AGENT_ID**: The ID of your Xpander agent +- **OPENAI_API_KEY**: From the [OpenAI Platform](https://platform.openai.com/api-keys) + +Set these as environment variables or in a `.env` file: + + + ```bash Export to CLI + export AGENTOPS_API_KEY="your_agentops_api_key_here" + export XPANDER_API_KEY="your_xpander_api_key_here" + export XPANDER_AGENT_ID="your_xpander_agent_id_here" + export OPENAI_API_KEY="your_openai_api_key_here" + ``` + ```txt Set in .env file + AGENTOPS_API_KEY="your_agentops_api_key_here" + XPANDER_API_KEY="your_xpander_api_key_here" + XPANDER_AGENT_ID="your_xpander_agent_id_here" + OPENAI_API_KEY="your_openai_api_key_here" + ``` + + +## Quick Start + +The key to AgentOps + Xpander integration is **initialization order**: Initialize AgentOps **before** importing the Xpander SDK to enable automatic instrumentation. + +```python +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# 1. Initialize AgentOps FIRST (this enables auto-instrumentation) +import agentops +agentops.init( + api_key=os.getenv("AGENTOPS_API_KEY"), + default_tags=["xpander", "production"] +) + +# 2. Now import Xpander SDK (instrumentation will automatically activate) +from xpander_sdk import XpanderClient, LLMProvider +from openai import OpenAI + +# 3. Set up your agent +xpander_client = XpanderClient(api_key=os.getenv("XPANDER_API_KEY")) +agent = xpander_client.agents.get(agent_id=os.getenv("XPANDER_AGENT_ID")) +agent.select_llm_provider(LLMProvider.OPEN_AI) + +openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + +# 4. Execute a task (automatically instrumented!) +execution = agent.add_task(input="What is the weather like today?") + +while not agent.is_finished(): + # Call LLM + response = openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=agent.messages, + tools=agent.get_tools(), + tool_choice=agent.tool_choice + ) + + # Add response to agent context + agent.add_messages(response.model_dump()) + + # Execute any tool calls + tool_calls = agent.extract_tool_calls(response.model_dump()) + if tool_calls: + agent.run_tools(tool_calls) + +# Get final result +result = agent.retrieve_execution_result() +print(f"Task completed: {result.result}") +``` + +## What's Automatically Tracked + +AgentOps automatically captures comprehensive telemetry from your Xpander agents: + +### 🤖 Agent Activities +- Agent initialization and configuration +- Task lifecycle (start, execution steps, completion) +- Workflow phase transitions (planning → executing → finished) +- Session management and context persistence + +### 🧠 LLM Interactions +- All OpenAI API calls with full request/response data +- Token usage and cost tracking across models +- Conversation history and context management +- Model parameters and settings + +### 🛠️ Tool Executions +- Tool call detection with parameters and arguments +- Tool execution results and success/failure status +- Tool performance metrics and timing +- Tool call hierarchies and dependencies + +### 📊 Performance Metrics +- End-to-end execution duration and timing +- Step-by-step workflow progression +- Resource utilization and efficiency metrics +- Error handling and exception tracking + +## Key Features + +### ✅ Zero-Configuration Setup +No manual trace creation or span management required. Simply initialize AgentOps before importing Xpander SDK. + +### ✅ Complete Workflow Visibility +Track the entire agent execution flow from task initiation to completion, including all intermediate steps. + +### ✅ Real-time Monitoring +View your agent activities in real-time on the AgentOps dashboard as they execute. + +### ✅ Tool Execution Insights +Monitor which tools are being called, their parameters, execution time, and results. + +### ✅ Cost Tracking +Automatic token usage tracking for all LLM interactions with cost analysis. + +## Runtime-Specific Instrumentation + +Xpander SDK uses JSII to create methods at runtime, which requires specialized instrumentation. AgentOps handles this automatically by: + +- **Method Wrapping**: Dynamically wrapping agent methods as they're created +- **Context Persistence**: Maintaining session context across runtime object lifecycle +- **Agent Detection**: Automatically detecting and instrumenting new agent instances +- **Tool Result Extraction**: Properly extracting results from JSII object references + +## Troubleshooting + +### Import Order Issues +If you're not seeing traces, ensure AgentOps is initialized before importing Xpander SDK: + +```python +# ✅ Correct order +import agentops +agentops.init() +from xpander_sdk import XpanderClient + +# ❌ Incorrect order +from xpander_sdk import XpanderClient +import agentops +agentops.init() # Too late - instrumentation won't activate +``` + +### Missing Tool Results +If tool results show `{"__jsii_ref__": "..."}` instead of actual content, ensure you're using the latest version of AgentOps, which includes improved JSII object handling. + +## Examples + + + + Complete example of a Xpander agent with automatic AgentOps instrumentation + + + Demonstrate complex workflows with multiple tool executions + + + + + + + \ No newline at end of file diff --git a/examples/xpander/coding_agent.ipynb b/examples/xpander/coding_agent.ipynb new file mode 100644 index 000000000..3b5981a28 --- /dev/null +++ b/examples/xpander/coding_agent.ipynb @@ -0,0 +1 @@ +{"cells":[],"metadata":{"kernelspec":{"display_name":"Python 3","language":"python","name":"python3"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.9.0"}},"nbformat":4,"nbformat_minor":4} From 749d32e4e89f192733daa7b761e830a9d116bbc7 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Mon, 21 Jul 2025 11:20:52 -0700 Subject: [PATCH 3/7] update graph view --- .../agentic/xpander/instrumentor.py | 179 ++++++++++++------ 1 file changed, 125 insertions(+), 54 deletions(-) diff --git a/agentops/instrumentation/agentic/xpander/instrumentor.py b/agentops/instrumentation/agentic/xpander/instrumentor.py index bbe58a350..48c6299f0 100644 --- a/agentops/instrumentation/agentic/xpander/instrumentor.py +++ b/agentops/instrumentation/agentic/xpander/instrumentor.py @@ -73,12 +73,14 @@ def safe_set_attribute(span, key: str, value: Any) -> None: class XpanderContext: - """Context manager for Xpander sessions.""" + """Context manager for Xpander sessions with nested conversation spans.""" def __init__(self): self._sessions = {} # session_id -> session_data self._workflow_spans = {} # session_id -> active workflow span self._agent_spans = {} # session_id -> active agent span + self._conversation_spans = {} # session_id -> active conversation span + self._conversation_counters = {} # session_id -> conversation counter self._lock = threading.Lock() def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_span=None, agent_span=None) -> None: @@ -98,6 +100,31 @@ def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_sp self._workflow_spans[session_id] = workflow_span if agent_span: self._agent_spans[session_id] = agent_span + + # Initialize conversation counter + self._conversation_counters[session_id] = 0 + + def start_conversation(self, session_id: str, conversation_span) -> None: + """Start a new conversation within the session.""" + with self._lock: + self._conversation_spans[session_id] = conversation_span + self._conversation_counters[session_id] = self._conversation_counters.get(session_id, 0) + 1 + + def end_conversation(self, session_id: str) -> None: + """End the current conversation.""" + with self._lock: + if session_id in self._conversation_spans: + del self._conversation_spans[session_id] + + def has_active_conversation(self, session_id: str) -> bool: + """Check if there's an active conversation for this session.""" + with self._lock: + return session_id in self._conversation_spans + + def get_conversation_counter(self, session_id: str) -> int: + """Get the current conversation counter.""" + with self._lock: + return self._conversation_counters.get(session_id, 0) def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Get session data.""" @@ -119,6 +146,10 @@ def end_session(self, session_id: str) -> None: del self._workflow_spans[session_id] if session_id in self._agent_spans: del self._agent_spans[session_id] + if session_id in self._conversation_spans: + del self._conversation_spans[session_id] + if session_id in self._conversation_counters: + del self._conversation_counters[session_id] def get_workflow_phase(self, session_id: str) -> str: """Detect current workflow phase based on state.""" @@ -141,6 +172,11 @@ def get_agent_span(self, session_id: str): """Get the active agent span for a session.""" with self._lock: return self._agent_spans.get(session_id) + + def get_conversation_span(self, session_id: str): + """Get the active conversation span for a session.""" + with self._lock: + return self._conversation_spans.get(session_id) def extract_current_message_content(messages) -> str: @@ -236,6 +272,15 @@ def __init__(self, config: Optional[InstrumentorConfig] = None): def _get_session_id_from_agent(self, agent) -> str: """Generate consistent session ID from agent.""" + # First try to get memory_thread_id from agent context if available + if hasattr(agent, 'memory_thread_id'): + return f"session_{agent.memory_thread_id}" + + # Check for execution context + if hasattr(agent, 'execution') and hasattr(agent.execution, 'memory_thread_id'): + return f"session_{agent.execution.memory_thread_id}" + + # Fallback to agent-based ID agent_name = getattr(agent, 'name', 'unknown') agent_id = getattr(agent, 'id', 'unknown') return f"agent_{agent_name}_{agent_id}" @@ -243,12 +288,18 @@ def _get_session_id_from_agent(self, agent) -> str: def _extract_session_id(self, execution, agent=None) -> str: """Extract session ID from execution data.""" if isinstance(execution, dict): - if 'thread_id' in execution: + if 'memory_thread_id' in execution: + return f"session_{execution['memory_thread_id']}" + elif 'thread_id' in execution: return f"session_{execution['thread_id']}" elif 'session_id' in execution: return f"session_{execution['session_id']}" - # Fallback to timestamp + # Fallback to agent-based ID if available + if agent: + return self._get_session_id_from_agent(agent) + + # Last resort fallback return f"session_{int(time.time())}" def _extract_tool_name(self, tool_call) -> str: @@ -492,6 +543,13 @@ def wrapper(self, execution): agent_name = getattr(self, 'name', 'unknown') agent_id = getattr(self, 'id', 'unknown') + # Check if session already exists + existing_session = instrumentor._context.get_session(session_id) + if existing_session: + # Session already exists, just continue + result = original_method(self, execution) + return result + # Extract task input task_input = None if isinstance(execution, dict): @@ -502,53 +560,56 @@ def wrapper(self, execution): elif isinstance(input_data, str): task_input = input_data - # Create top-level agent span using existing AgentOps utility - agent_span_attributes = { - SpanAttributes.AGENTOPS_ENTITY_NAME: agent_name, - "xpander.span.type": "agent", + + # Create top-level conversation/session span - this is the ROOT span + conversation_span_attributes = { + SpanAttributes.AGENTOPS_ENTITY_NAME: f"Session - {agent_name}", + "xpander.span.type": "session", + "xpander.session.name": f"Session - {agent_name}", "xpander.agent.name": agent_name, "xpander.agent.id": agent_id, "xpander.session.id": session_id, - "agent.name": agent_name, - "agent.id": agent_id, } - agent_span, agent_ctx, agent_token = tracer.make_span( - operation_name=agent_name, - span_kind=SpanKind.AGENT, - attributes=agent_span_attributes + session_span, session_ctx, session_token = tracer.make_span( + operation_name=f"session.{agent_name}", + span_kind=SpanKind.AGENT, # Use AGENT kind for the root session span + attributes=conversation_span_attributes ) - - # Set task input on agent span + # Set task input on session span if task_input: - safe_set_attribute(agent_span, SpanAttributes.AGENTOPS_ENTITY_INPUT, task_input[:1000]) - safe_set_attribute(agent_span, "xpander.agent.task_input", task_input[:500]) + safe_set_attribute(session_span, SpanAttributes.AGENTOPS_ENTITY_INPUT, task_input[:1000]) + safe_set_attribute(session_span, "xpander.session.initial_input", task_input[:500]) - # Create workflow span as child of agent span using existing utility + # Create workflow span as child of session span (this will be the main execution span) + trace.set_span_in_context(session_span) workflow_span_attributes = { "xpander.span.type": "workflow", "xpander.workflow.phase": "planning", "xpander.agent.name": agent_name, "xpander.agent.id": agent_id, "xpander.session.id": session_id, + "agent.name": agent_name, + "agent.id": agent_id, } - # Set agent span as current context for proper nesting - trace.set_span_in_context(agent_span) workflow_span, workflow_ctx, workflow_token = tracer.make_span( operation_name=f"workflow.{agent_name}", span_kind=SpanKind.WORKFLOW, attributes=workflow_span_attributes ) + # No separate agent span - workflow span contains all agent info + # Initialize workflow state with persistent spans agent_info = { "agent_name": agent_name, "agent_id": agent_id, "task_input": task_input, - "thread_id": execution.get('thread_id') if isinstance(execution, dict) else None + "thread_id": execution.get('memory_thread_id') if isinstance(execution, dict) else None } - instrumentor._context.start_session(session_id, agent_info, workflow_span, agent_span) - + instrumentor._context.start_session(session_id, agent_info, workflow_span, None) # No agent span + # Store the session span as well + instrumentor._context.start_conversation(session_id, session_span) try: # Execute original method - don't end agent span here, it will be ended in retrieve_execution_result @@ -601,10 +662,16 @@ def wrapper(self, tool_calls, payload_extension=None): # Execute tools and create individual tool spans results = [] + conversation_finished = False + for i, tool_call in enumerate(tool_calls): tool_name = instrumentor._extract_tool_name(tool_call) tool_params = instrumentor._extract_tool_params(tool_call) + # Check if this is the conversation finish tool + if tool_name == "xpfinish-agent-execution-finished": + conversation_finished = True + start_time = time.time() # Create tool span as child of execution span @@ -694,6 +761,12 @@ def wrapper(self, tool_calls, payload_extension=None): else: safe_set_attribute(tool_span, "xpander.tool.result_summary", "No results returned") + # If conversation is finished, mark for session closure + if conversation_finished: + # Since session span is now the conversation span, we need to close all spans + # when the conversation finishes + logger.info(f"Conversation finished for session {session_id} - marking for closure") + return results return wrapper @@ -707,12 +780,13 @@ def wrapper(self, messages): current_session = instrumentor._context.get_session(session_id) current_phase = instrumentor._context.get_workflow_phase(session_id) workflow_span = instrumentor._context.get_workflow_span(session_id) + conversation_span = instrumentor._context.get_conversation_span(session_id) # Extract and clean message content message_content = extract_current_message_content(messages) - - # Create LLM span as child of workflow span + # Create LLM span as child of workflow span (not conversation span) + # The hierarchy should be: session -> agent/workflow -> LLM -> execution -> tools llm_span_context = trace.set_span_in_context(workflow_span) if workflow_span else None # Call original method first to get the actual OpenAI response @@ -814,7 +888,7 @@ def wrapper(self): session_id = instrumentor._get_session_id_from_agent(self) current_session = instrumentor._context.get_session(session_id) workflow_span = instrumentor._context.get_workflow_span(session_id) - agent_span = instrumentor._context.get_agent_span(session_id) + session_span = instrumentor._context.get_conversation_span(session_id) # This is now the root session span try: # Execute and capture result @@ -832,58 +906,55 @@ def wrapper(self): safe_set_attribute(workflow_span, "xpander.workflow.execution_time", execution_time) safe_set_attribute(workflow_span, "xpander.workflow.phase", "completed") - # Set result details on both workflow and agent spans + # Set result details on session and workflow spans if result: result_content = "" if hasattr(result, 'result'): result_content = str(result.result)[:1000] + # Set on session span (root span) + if session_span and result_content: + safe_set_attribute(session_span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, result_content) + safe_set_attribute(session_span, "xpander.session.final_result", result_content) + if hasattr(result, 'memory_thread_id'): + safe_set_attribute(session_span, "xpander.session.thread_id", result.memory_thread_id) + if workflow_span: if result_content: safe_set_attribute(workflow_span, "xpander.result.content", result_content) if hasattr(result, 'memory_thread_id'): safe_set_attribute(workflow_span, "xpander.result.thread_id", result.memory_thread_id) - - if agent_span: - if result_content: - safe_set_attribute(agent_span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, result_content) - safe_set_attribute(agent_span, "xpander.agent.final_result", result_content) - if hasattr(result, 'memory_thread_id'): - safe_set_attribute(agent_span, "xpander.agent.thread_id", result.memory_thread_id) - - # Add agent summary - if current_session: - safe_set_attribute(agent_span, "xpander.agent.total_steps", current_session.get('step_count', 0)) - safe_set_attribute(agent_span, "xpander.agent.total_tokens", current_session.get('total_tokens', 0)) - safe_set_attribute(agent_span, "xpander.agent.tools_used", len(current_session.get('tools_executed', []))) - - start_time = current_session.get('start_time', time.time()) - execution_time = time.time() - start_time - safe_set_attribute(agent_span, "xpander.agent.execution_time", execution_time) - + # Add session summary to session span + if session_span and current_session: + safe_set_attribute(session_span, "xpander.session.total_steps", current_session.get('step_count', 0)) + safe_set_attribute(session_span, "xpander.session.total_tokens", current_session.get('total_tokens', 0)) + safe_set_attribute(session_span, "xpander.session.tools_used", len(current_session.get('tools_executed', []))) + + start_time = current_session.get('start_time', time.time()) + execution_time = time.time() - start_time + safe_set_attribute(session_span, "xpander.session.execution_time", execution_time) - # Mark spans as successful and close them using existing utilities + # Close all spans - session span should be closed last if workflow_span: _finish_span_success(workflow_span) workflow_span.end() - if agent_span: - _finish_span_success(agent_span) - agent_span.end() + if session_span: + _finish_span_success(session_span) + session_span.end() return result except Exception as e: - # Mark spans as failed and close them using existing utilities + # Mark spans as failed and close them in proper order if workflow_span: _finish_span_error(workflow_span, e) workflow_span.end() - if agent_span: - _finish_span_error(agent_span, e) - agent_span.end() - + if session_span: + _finish_span_error(session_span, e) + session_span.end() raise finally: # Clean up session From 0ae2b246922db9a91ee85039621c4745d1ba1d28 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Mon, 21 Jul 2025 12:47:24 -0700 Subject: [PATCH 4/7] cleanup instrumentation, updated xpandercontext --- .../agentic/xpander/__init__.py | 12 +- .../agentic/xpander/context.py | 112 +++ .../agentic/xpander/instrumentor.py | 813 +++++++----------- .../agentic/xpander/trace_probe.py | 18 +- .../agentic/xpander/version.py | 2 +- 5 files changed, 444 insertions(+), 513 deletions(-) create mode 100644 agentops/instrumentation/agentic/xpander/context.py diff --git a/agentops/instrumentation/agentic/xpander/__init__.py b/agentops/instrumentation/agentic/xpander/__init__.py index 8f1ab8cc0..007656dfa 100644 --- a/agentops/instrumentation/agentic/xpander/__init__.py +++ b/agentops/instrumentation/agentic/xpander/__init__.py @@ -1,15 +1,15 @@ """Xpander SDK instrumentation for AgentOps.""" -from agentops.instrumentation.agentic.xpander.instrumentor import ( - XpanderInstrumentor, +from agentops.instrumentation.agentic.xpander.instrumentor import XpanderInstrumentor +from agentops.instrumentation.agentic.xpander.trace_probe import ( wrap_openai_call_for_xpander, is_xpander_session_active, - get_active_xpander_session + get_active_xpander_session, ) __all__ = [ "XpanderInstrumentor", - "wrap_openai_call_for_xpander", + "wrap_openai_call_for_xpander", "is_xpander_session_active", - "get_active_xpander_session" -] \ No newline at end of file + "get_active_xpander_session", +] diff --git a/agentops/instrumentation/agentic/xpander/context.py b/agentops/instrumentation/agentic/xpander/context.py new file mode 100644 index 000000000..5dc31d53d --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/context.py @@ -0,0 +1,112 @@ +"""Xpander context management for session tracking.""" + +import time +import threading +from typing import Any, Dict, Optional + + +class XpanderContext: + """Context manager for Xpander sessions with nested conversation spans.""" + + def __init__(self): + self._sessions = {} # session_id -> session_data + self._workflow_spans = {} # session_id -> active workflow span + self._agent_spans = {} # session_id -> active agent span + self._conversation_spans = {} # session_id -> active conversation span + self._conversation_counters = {} # session_id -> conversation counter + self._lock = threading.Lock() + + def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_span=None, agent_span=None) -> None: + """Start a new session with agent info.""" + with self._lock: + self._sessions[session_id] = { + "agent_name": agent_info.get("agent_name", "unknown"), + "agent_id": agent_info.get("agent_id", "unknown"), + "task_input": agent_info.get("task_input"), + "phase": "planning", + "step_count": 0, + "total_tokens": 0, + "tools_executed": [], + "start_time": time.time(), + } + if workflow_span: + self._workflow_spans[session_id] = workflow_span + if agent_span: + self._agent_spans[session_id] = agent_span + + # Initialize conversation counter + self._conversation_counters[session_id] = 0 + + def start_conversation(self, session_id: str, conversation_span) -> None: + """Start a new conversation within the session.""" + with self._lock: + self._conversation_spans[session_id] = conversation_span + self._conversation_counters[session_id] = self._conversation_counters.get(session_id, 0) + 1 + + def end_conversation(self, session_id: str) -> None: + """End the current conversation.""" + with self._lock: + if session_id in self._conversation_spans: + del self._conversation_spans[session_id] + + def has_active_conversation(self, session_id: str) -> bool: + """Check if there's an active conversation for this session.""" + with self._lock: + return session_id in self._conversation_spans + + def get_conversation_counter(self, session_id: str) -> int: + """Get the current conversation counter.""" + with self._lock: + return self._conversation_counters.get(session_id, 0) + + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get session data.""" + with self._lock: + return self._sessions.get(session_id) + + def update_session(self, session_id: str, updates: Dict[str, Any]) -> None: + """Update session data.""" + with self._lock: + if session_id in self._sessions: + self._sessions[session_id].update(updates) + + def end_session(self, session_id: str) -> None: + """End a session.""" + with self._lock: + if session_id in self._sessions: + del self._sessions[session_id] + if session_id in self._workflow_spans: + del self._workflow_spans[session_id] + if session_id in self._agent_spans: + del self._agent_spans[session_id] + if session_id in self._conversation_spans: + del self._conversation_spans[session_id] + if session_id in self._conversation_counters: + del self._conversation_counters[session_id] + + def get_workflow_phase(self, session_id: str) -> str: + """Detect current workflow phase based on state.""" + with self._lock: + session = self._sessions.get(session_id, {}) + + if session.get('tools_executed', []): + return "executing" + elif session.get('step_count', 0) > 0: + return "executing" + else: + return "planning" + + def get_workflow_span(self, session_id: str): + """Get the active workflow span for a session.""" + with self._lock: + return self._workflow_spans.get(session_id) + + def get_agent_span(self, session_id: str): + """Get the active agent span for a session.""" + with self._lock: + return self._agent_spans.get(session_id) + + def get_conversation_span(self, session_id: str): + """Get the active conversation span for a session.""" + with self._lock: + return self._conversation_spans.get(session_id) \ No newline at end of file diff --git a/agentops/instrumentation/agentic/xpander/instrumentor.py b/agentops/instrumentation/agentic/xpander/instrumentor.py index 48c6299f0..9a25be3a5 100644 --- a/agentops/instrumentation/agentic/xpander/instrumentor.py +++ b/agentops/instrumentation/agentic/xpander/instrumentor.py @@ -22,9 +22,8 @@ import logging import time -import threading import json -from typing import Any, Dict, Optional +from typing import Any, Optional from opentelemetry.metrics import Meter from opentelemetry.trace import SpanKind as OTelSpanKind from opentelemetry import trace @@ -35,22 +34,11 @@ InstrumentorConfig, StandardMetrics, ) -from agentops.instrumentation.common.span_management import ( - SpanAttributeManager, - create_span -) -from agentops.instrumentation.common.wrappers import ( - _finish_span_success, - _finish_span_error, - _update_span -) -from agentops.instrumentation.common.attributes import ( - _extract_attributes_from_mapping, - get_common_attributes -) +from agentops.instrumentation.common.span_management import SpanAttributeManager +from agentops.instrumentation.common.wrappers import _finish_span_success, _finish_span_error, _update_span from agentops.helpers.serialization import safe_serialize, model_to_dict from agentops.sdk.core import tracer -from agentops.instrumentation.agentic.xpander.version import __version__ +from agentops.instrumentation.agentic.xpander.context import XpanderContext from agentops.semconv import SpanAttributes, SpanKind, ToolAttributes from agentops.semconv.message import MessageAttributes @@ -63,6 +51,7 @@ _instruments = ("xpander-sdk >= 1.0.0",) + # Use existing AgentOps utility instead of custom implementation def safe_set_attribute(span, key: str, value: Any) -> None: """Set attribute on span using existing AgentOps utility.""" @@ -72,197 +61,13 @@ def safe_set_attribute(span, key: str, value: Any) -> None: logger.warning(f"Failed to set attribute {key}: {e}") -class XpanderContext: - """Context manager for Xpander sessions with nested conversation spans.""" - - def __init__(self): - self._sessions = {} # session_id -> session_data - self._workflow_spans = {} # session_id -> active workflow span - self._agent_spans = {} # session_id -> active agent span - self._conversation_spans = {} # session_id -> active conversation span - self._conversation_counters = {} # session_id -> conversation counter - self._lock = threading.Lock() - - def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_span=None, agent_span=None) -> None: - """Start a new session with agent info.""" - with self._lock: - self._sessions[session_id] = { - "agent_name": agent_info.get("agent_name", "unknown"), - "agent_id": agent_info.get("agent_id", "unknown"), - "task_input": agent_info.get("task_input"), - "phase": "planning", - "step_count": 0, - "total_tokens": 0, - "tools_executed": [], - "start_time": time.time(), - } - if workflow_span: - self._workflow_spans[session_id] = workflow_span - if agent_span: - self._agent_spans[session_id] = agent_span - - # Initialize conversation counter - self._conversation_counters[session_id] = 0 - - def start_conversation(self, session_id: str, conversation_span) -> None: - """Start a new conversation within the session.""" - with self._lock: - self._conversation_spans[session_id] = conversation_span - self._conversation_counters[session_id] = self._conversation_counters.get(session_id, 0) + 1 - - def end_conversation(self, session_id: str) -> None: - """End the current conversation.""" - with self._lock: - if session_id in self._conversation_spans: - del self._conversation_spans[session_id] - - def has_active_conversation(self, session_id: str) -> bool: - """Check if there's an active conversation for this session.""" - with self._lock: - return session_id in self._conversation_spans - - def get_conversation_counter(self, session_id: str) -> int: - """Get the current conversation counter.""" - with self._lock: - return self._conversation_counters.get(session_id, 0) - - def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: - """Get session data.""" - with self._lock: - return self._sessions.get(session_id) - - def update_session(self, session_id: str, updates: Dict[str, Any]) -> None: - """Update session data.""" - with self._lock: - if session_id in self._sessions: - self._sessions[session_id].update(updates) - - def end_session(self, session_id: str) -> None: - """End a session.""" - with self._lock: - if session_id in self._sessions: - del self._sessions[session_id] - if session_id in self._workflow_spans: - del self._workflow_spans[session_id] - if session_id in self._agent_spans: - del self._agent_spans[session_id] - if session_id in self._conversation_spans: - del self._conversation_spans[session_id] - if session_id in self._conversation_counters: - del self._conversation_counters[session_id] - - def get_workflow_phase(self, session_id: str) -> str: - """Detect current workflow phase based on state.""" - with self._lock: - session = self._sessions.get(session_id, {}) - - if session.get('tools_executed', []): - return "executing" - elif session.get('step_count', 0) > 0: - return "executing" - else: - return "planning" - - def get_workflow_span(self, session_id: str): - """Get the active workflow span for a session.""" - with self._lock: - return self._workflow_spans.get(session_id) - - def get_agent_span(self, session_id: str): - """Get the active agent span for a session.""" - with self._lock: - return self._agent_spans.get(session_id) - - def get_conversation_span(self, session_id: str): - """Get the active conversation span for a session.""" - with self._lock: - return self._conversation_spans.get(session_id) - - -def extract_current_message_content(messages) -> str: - """Extract current response content using existing AgentOps serialization utilities.""" - if not messages: - return "" - - try: - # Use existing AgentOps serialization for consistent handling - if hasattr(messages, '__dict__'): - # Convert Pydantic/object to dict using existing utility - messages_dict = model_to_dict(messages) - else: - messages_dict = messages - - # Use safe_serialize for consistent string conversion - content = safe_serialize(messages_dict) - - # Apply consistent truncation following AgentOps patterns - max_length = 1500 - if len(content) > max_length: - content = content[:max_length - 3] + "..." - - return content - - except Exception as e: - logger.warning(f"Failed to extract message content: {e}") - # Fallback to safe string conversion - try: - return safe_serialize(messages)[:1000] - except Exception: - return str(messages)[:1000] if messages else "" - - -def clean_llm_content(content: str) -> str: - """Clean LLM content for display using robust, pattern-agnostic approach.""" - if not isinstance(content, str): - return str(content)[:1000] - - if not content or not content.strip(): - return "" - - # Basic content cleaning without hardcoded patterns - cleaned_content = content.strip() - - # Remove excessive whitespace and normalize line breaks - lines = [] - for line in cleaned_content.split('\n'): - line = line.strip() - if line: # Skip empty lines - lines.append(line) - - # Join with single newlines - cleaned_content = '\n'.join(lines) - - # General heuristic: if content is very long and contains structured data patterns, - # try to extract the main response by taking the first substantial paragraph - if len(cleaned_content) > 3000: - paragraphs = cleaned_content.split('\n\n') - if len(paragraphs) > 1: - # Find the first substantial paragraph (more than 50 chars) - for paragraph in paragraphs: - if len(paragraph.strip()) > 50: - cleaned_content = paragraph.strip() - break - - # Apply consistent truncation with ellipsis (following AgentOps patterns) - max_length = 1500 - if len(cleaned_content) > max_length: - cleaned_content = cleaned_content[:max_length - 3] + "..." - - return cleaned_content - - - - class XpanderInstrumentor(CommonInstrumentor): """Instrumentor for Xpander SDK interactions.""" def __init__(self, config: Optional[InstrumentorConfig] = None): if config is None: config = InstrumentorConfig( - library_name="xpander-sdk", - library_version="1.0.0", - dependencies=_instruments, - metrics_enabled=True + library_name="xpander-sdk", library_version="1.0.0", dependencies=_instruments, metrics_enabled=True ) super().__init__(config) self._context = XpanderContext() @@ -273,73 +78,74 @@ def __init__(self, config: Optional[InstrumentorConfig] = None): def _get_session_id_from_agent(self, agent) -> str: """Generate consistent session ID from agent.""" # First try to get memory_thread_id from agent context if available - if hasattr(agent, 'memory_thread_id'): + if hasattr(agent, "memory_thread_id"): return f"session_{agent.memory_thread_id}" - + # Check for execution context - if hasattr(agent, 'execution') and hasattr(agent.execution, 'memory_thread_id'): + if hasattr(agent, "execution") and hasattr(agent.execution, "memory_thread_id"): return f"session_{agent.execution.memory_thread_id}" - + # Fallback to agent-based ID - agent_name = getattr(agent, 'name', 'unknown') - agent_id = getattr(agent, 'id', 'unknown') + agent_name = getattr(agent, "name", "unknown") + agent_id = getattr(agent, "id", "unknown") return f"agent_{agent_name}_{agent_id}" def _extract_session_id(self, execution, agent=None) -> str: """Extract session ID from execution data.""" if isinstance(execution, dict): - if 'memory_thread_id' in execution: + if "memory_thread_id" in execution: return f"session_{execution['memory_thread_id']}" - elif 'thread_id' in execution: + elif "thread_id" in execution: return f"session_{execution['thread_id']}" - elif 'session_id' in execution: + elif "session_id" in execution: return f"session_{execution['session_id']}" - + # Fallback to agent-based ID if available if agent: return self._get_session_id_from_agent(agent) - + # Last resort fallback return f"session_{int(time.time())}" def _extract_tool_name(self, tool_call) -> str: """Extract tool name from tool call.""" # Handle different tool call formats - if hasattr(tool_call, 'function_name'): + if hasattr(tool_call, "function_name"): return tool_call.function_name - elif hasattr(tool_call, 'function') and hasattr(tool_call.function, 'name'): + elif hasattr(tool_call, "function") and hasattr(tool_call.function, "name"): return tool_call.function.name - elif hasattr(tool_call, 'name'): + elif hasattr(tool_call, "name"): return tool_call.name elif isinstance(tool_call, dict): - if 'function' in tool_call: - return tool_call['function'].get('name', 'unknown') - elif 'function_name' in tool_call: - return tool_call['function_name'] - elif 'name' in tool_call: - return tool_call['name'] - + if "function" in tool_call: + return tool_call["function"].get("name", "unknown") + elif "function_name" in tool_call: + return tool_call["function_name"] + elif "name" in tool_call: + return tool_call["name"] + # Try to extract from string representation import re + patterns = [ r'function[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', r'name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', - r'([a-zA-Z_][a-zA-Z0-9_]*)\.tool', - r'function_name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]' + r"([a-zA-Z_][a-zA-Z0-9_]*)\.tool", + r'function_name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', ] - + tool_str = str(tool_call) for pattern in patterns: match = re.search(pattern, tool_str, re.IGNORECASE) if match: return match.group(1) - - return 'unknown' + + return "unknown" def _extract_tool_params(self, tool_call) -> dict: """Extract tool parameters from tool call.""" # Handle different parameter formats - if hasattr(tool_call, 'function') and hasattr(tool_call.function, 'arguments'): + if hasattr(tool_call, "function") and hasattr(tool_call.function, "arguments"): try: args = tool_call.function.arguments if isinstance(args, str): @@ -348,7 +154,7 @@ def _extract_tool_params(self, tool_call) -> dict: return args except (json.JSONDecodeError, AttributeError): pass - elif hasattr(tool_call, 'arguments'): + elif hasattr(tool_call, "arguments"): try: args = tool_call.arguments if isinstance(args, str): @@ -358,97 +164,94 @@ def _extract_tool_params(self, tool_call) -> dict: except (json.JSONDecodeError, AttributeError): pass elif isinstance(tool_call, dict): - if 'function' in tool_call: - args = tool_call['function'].get('arguments', '{}') + if "function" in tool_call: + args = tool_call["function"].get("arguments", "{}") try: return json.loads(args) if isinstance(args, str) else args except json.JSONDecodeError: pass - elif 'arguments' in tool_call: - args = tool_call['arguments'] + elif "arguments" in tool_call: + args = tool_call["arguments"] try: return json.loads(args) if isinstance(args, str) else args except json.JSONDecodeError: pass - + return {} def _extract_llm_data_from_messages(self, messages) -> dict: """Extract LLM metadata from messages.""" data = {} - + if isinstance(messages, dict): # Direct model and usage fields - if 'model' in messages: - data['model'] = messages['model'] - if 'usage' in messages: - data['usage'] = messages['usage'] - + if "model" in messages: + data["model"] = messages["model"] + if "usage" in messages: + data["usage"] = messages["usage"] + # Check in choices array (OpenAI format) - if 'choices' in messages and messages['choices']: - choice = messages['choices'][0] - if 'message' in choice: - message = choice['message'] - if 'model' in message: - data['model'] = message['model'] - + if "choices" in messages and messages["choices"]: + choice = messages["choices"][0] + if "message" in choice: + message = choice["message"] + if "model" in message: + data["model"] = message["model"] + elif isinstance(messages, list): # Look for assistant messages with metadata for msg in messages: - if isinstance(msg, dict) and msg.get('role') == 'assistant': - if 'model' in msg: - data['model'] = msg['model'] - if 'usage' in msg: - data['usage'] = msg['usage'] + if isinstance(msg, dict) and msg.get("role") == "assistant": + if "model" in msg: + data["model"] = msg["model"] + if "usage" in msg: + data["usage"] = msg["usage"] break - + # Try to extract from any nested structures - if not data and hasattr(messages, '__dict__'): + if not data and hasattr(messages, "__dict__"): msg_dict = messages.__dict__ - if 'model' in msg_dict: - data['model'] = msg_dict['model'] - if 'usage' in msg_dict: - data['usage'] = msg_dict['usage'] - + if "model" in msg_dict: + data["model"] = msg_dict["model"] + if "usage" in msg_dict: + data["usage"] = msg_dict["usage"] + return data def _extract_and_set_openai_message_attributes(self, span, messages, result, agent=None): """Extract and set OpenAI message attributes from messages and response.""" try: - # Lazy import to avoid circular imports - try: - from agentops.instrumentation.providers.openai.attributes.common import get_response_attributes - except ImportError: - logger.debug("OpenAI attributes module not available, using fallback") - # Continue with manual extraction as fallback + # Manual extraction since we don't need the OpenAI module for this # Try to get the agent's current message history for prompts agent_messages = [] - if agent and hasattr(agent, 'messages'): - agent_messages = getattr(agent, 'messages', []) - elif agent and hasattr(agent, 'conversation_history'): - agent_messages = getattr(agent, 'conversation_history', []) - elif agent and hasattr(agent, 'history'): - agent_messages = getattr(agent, 'history', []) - + if agent and hasattr(agent, "messages"): + agent_messages = getattr(agent, "messages", []) + elif agent and hasattr(agent, "conversation_history"): + agent_messages = getattr(agent, "conversation_history", []) + elif agent and hasattr(agent, "history"): + agent_messages = getattr(agent, "history", []) + # Also try to extract messages from the messages parameter itself if isinstance(messages, list): # If messages is a list of messages, use it directly agent_messages.extend(messages) - elif isinstance(messages, dict) and 'messages' in messages: + elif isinstance(messages, dict) and "messages" in messages: # If messages contains a messages key - agent_messages.extend(messages.get('messages', [])) - + agent_messages.extend(messages.get("messages", [])) + # Set prompt messages (input to LLM) prompt_index = 0 for msg in agent_messages[-10:]: # Get last 10 messages to avoid huge context if isinstance(msg, dict): - role = msg.get('role', 'user') - content = msg.get('content', '') - + role = msg.get("role", "user") + content = msg.get("content", "") + # Handle different content formats if content and isinstance(content, str) and content.strip(): safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) - safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content[:2000]) + safe_set_attribute( + span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content[:2000] + ) prompt_index += 1 elif content and isinstance(content, list): # Handle multi-modal content @@ -456,111 +259,149 @@ def _extract_and_set_openai_message_attributes(self, span, messages, result, age safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content_str) prompt_index += 1 - elif hasattr(msg, 'content'): + elif hasattr(msg, "content"): # Handle object with content attribute - content = getattr(msg, 'content', '') - role = getattr(msg, 'role', 'user') + content = getattr(msg, "content", "") + role = getattr(msg, "role", "user") if content and isinstance(content, str) and content.strip(): safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) - safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), str(content)[:2000]) + safe_set_attribute( + span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), str(content)[:2000] + ) prompt_index += 1 - + # Set completion messages (response from LLM) completion_index = 0 response_data = result if result else messages - + # Handle different response formats if isinstance(response_data, dict): - choices = response_data.get('choices', []) + choices = response_data.get("choices", []) for choice in choices: - message = choice.get('message', {}) - role = message.get('role', 'assistant') - content = message.get('content', '') - + message = choice.get("message", {}) + role = message.get("role", "assistant") + content = message.get("content", "") + if content: safe_set_attribute(span, MessageAttributes.COMPLETION_ROLE.format(i=completion_index), role) - safe_set_attribute(span, MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), content[:2000]) - + safe_set_attribute( + span, MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), content[:2000] + ) + # Handle tool calls in the response - tool_calls = message.get('tool_calls', []) + tool_calls = message.get("tool_calls", []) for j, tool_call in enumerate(tool_calls): - tool_id = tool_call.get('id', '') - tool_name = tool_call.get('function', {}).get('name', '') - tool_args = tool_call.get('function', {}).get('arguments', '') - + tool_id = tool_call.get("id", "") + tool_name = tool_call.get("function", {}).get("name", "") + tool_args = tool_call.get("function", {}).get("arguments", "") + if tool_id: - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), tool_id) + safe_set_attribute( + span, MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), tool_id + ) if tool_name: - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), tool_name) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), + tool_name, + ) if tool_args: - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=completion_index, j=j), tool_args[:500]) - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), "function") - + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=completion_index, j=j), + tool_args[:500], + ) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), + "function", + ) + completion_index += 1 - elif hasattr(response_data, 'choices'): + elif hasattr(response_data, "choices"): # Handle response object with choices attribute - choices = getattr(response_data, 'choices', []) + choices = getattr(response_data, "choices", []) for choice in choices: - message = getattr(choice, 'message', None) + message = getattr(choice, "message", None) if message: - role = getattr(message, 'role', 'assistant') - content = getattr(message, 'content', '') - + role = getattr(message, "role", "assistant") + content = getattr(message, "content", "") + if content: safe_set_attribute(span, MessageAttributes.COMPLETION_ROLE.format(i=completion_index), role) - safe_set_attribute(span, MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), str(content)[:2000]) - + safe_set_attribute( + span, + MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), + str(content)[:2000], + ) + # Handle tool calls - tool_calls = getattr(message, 'tool_calls', []) + tool_calls = getattr(message, "tool_calls", []) for j, tool_call in enumerate(tool_calls): - tool_id = getattr(tool_call, 'id', '') - function = getattr(tool_call, 'function', None) + tool_id = getattr(tool_call, "id", "") + function = getattr(tool_call, "function", None) if function: - tool_name = getattr(function, 'name', '') - tool_args = getattr(function, 'arguments', '') - + tool_name = getattr(function, "name", "") + tool_args = getattr(function, "arguments", "") + if tool_id: - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), tool_id) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), + tool_id, + ) if tool_name: - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), tool_name) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), + tool_name, + ) if tool_args: - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=completion_index, j=j), str(tool_args)[:500]) - safe_set_attribute(span, MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), "function") - + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format( + i=completion_index, j=j + ), + str(tool_args)[:500], + ) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), + "function", + ) + completion_index += 1 - - + except Exception as e: logger.error(f"Error extracting OpenAI message attributes: {e}") def _wrap_init_task(self, original_method): """Wrap init_task to create agent span hierarchy.""" instrumentor = self + def wrapper(self, execution): - # Extract session ID and agent info session_id = instrumentor._extract_session_id(execution) - agent_name = getattr(self, 'name', 'unknown') - agent_id = getattr(self, 'id', 'unknown') - + agent_name = getattr(self, "name", "unknown") + agent_id = getattr(self, "id", "unknown") + # Check if session already exists existing_session = instrumentor._context.get_session(session_id) if existing_session: # Session already exists, just continue result = original_method(self, execution) return result - + # Extract task input task_input = None if isinstance(execution, dict): - if 'input' in execution: - input_data = execution['input'] - if isinstance(input_data, dict) and 'text' in input_data: - task_input = input_data['text'] + if "input" in execution: + input_data = execution["input"] + if isinstance(input_data, dict) and "text" in input_data: + task_input = input_data["text"] elif isinstance(input_data, str): task_input = input_data - - + # Create top-level conversation/session span - this is the ROOT span conversation_span_attributes = { SpanAttributes.AGENTOPS_ENTITY_NAME: f"Session - {agent_name}", @@ -573,14 +414,14 @@ def wrapper(self, execution): session_span, session_ctx, session_token = tracer.make_span( operation_name=f"session.{agent_name}", span_kind=SpanKind.AGENT, # Use AGENT kind for the root session span - attributes=conversation_span_attributes + attributes=conversation_span_attributes, ) - + # Set task input on session span if task_input: safe_set_attribute(session_span, SpanAttributes.AGENTOPS_ENTITY_INPUT, task_input[:1000]) safe_set_attribute(session_span, "xpander.session.initial_input", task_input[:500]) - + # Create workflow span as child of session span (this will be the main execution span) trace.set_span_in_context(session_span) workflow_span_attributes = { @@ -595,22 +436,22 @@ def wrapper(self, execution): workflow_span, workflow_ctx, workflow_token = tracer.make_span( operation_name=f"workflow.{agent_name}", span_kind=SpanKind.WORKFLOW, - attributes=workflow_span_attributes + attributes=workflow_span_attributes, ) - + # No separate agent span - workflow span contains all agent info - + # Initialize workflow state with persistent spans agent_info = { "agent_name": agent_name, "agent_id": agent_id, "task_input": task_input, - "thread_id": execution.get('memory_thread_id') if isinstance(execution, dict) else None + "thread_id": execution.get("memory_thread_id") if isinstance(execution, dict) else None, } instrumentor._context.start_session(session_id, agent_info, workflow_span, None) # No agent span # Store the session span as well instrumentor._context.start_conversation(session_id, session_span) - + try: # Execute original method - don't end agent span here, it will be ended in retrieve_execution_result result = original_method(self, execution) @@ -618,36 +459,38 @@ def wrapper(self, execution): except Exception as e: # Use existing AgentOps error handling utilities _finish_span_error(workflow_span, e) - _finish_span_error(agent_span, e) raise - + return wrapper def _wrap_run_tools(self, original_method): """Wrap run_tools to create execution phase tool spans.""" instrumentor = self + def wrapper(self, tool_calls, payload_extension=None): - session_id = instrumentor._get_session_id_from_agent(self) current_session = instrumentor._context.get_session(session_id) - + # Update workflow state - step_num = (current_session.get('step_count', 0) + 1) if current_session else 1 - instrumentor._context.update_session(session_id, { - 'step_count': step_num, - 'phase': 'executing', - 'tools_executed': (current_session.get('tools_executed', []) if current_session else []) + - [instrumentor._extract_tool_name(tc) for tc in tool_calls] - }) - + step_num = (current_session.get("step_count", 0) + 1) if current_session else 1 + instrumentor._context.update_session( + session_id, + { + "step_count": step_num, + "phase": "executing", + "tools_executed": (current_session.get("tools_executed", []) if current_session else []) + + [instrumentor._extract_tool_name(tc) for tc in tool_calls], + }, + ) + # Get current span context (should be the LLM span) current_span = trace.get_current_span() - + # Create execution phase span as child of current LLM span execution_span_context = trace.set_span_in_context(current_span) if current_span else None - + with instrumentor._tracer.start_as_current_span( - f"xpander.execution", + "xpander.execution", kind=OTelSpanKind.INTERNAL, context=execution_span_context, attributes={ @@ -657,26 +500,25 @@ def wrapper(self, tool_calls, payload_extension=None): "xpander.step.number": step_num, "xpander.step.tool_count": len(tool_calls), "xpander.session.id": session_id, - } + }, ) as execution_span: - # Execute tools and create individual tool spans results = [] conversation_finished = False - + for i, tool_call in enumerate(tool_calls): tool_name = instrumentor._extract_tool_name(tool_call) tool_params = instrumentor._extract_tool_params(tool_call) - + # Check if this is the conversation finish tool if tool_name == "xpfinish-agent-execution-finished": conversation_finished = True - + start_time = time.time() - + # Create tool span as child of execution span tool_span_context = trace.set_span_in_context(execution_span) - + with instrumentor._tracer.start_as_current_span( f"tool.{tool_name}", kind=OTelSpanKind.CLIENT, @@ -689,43 +531,41 @@ def wrapper(self, tool_calls, payload_extension=None): "xpander.workflow.phase": "executing", "xpander.tool.step": step_num, "xpander.tool.index": i, - } + }, ) as tool_span: - - # Execute single tool single_result = original_method(self, [tool_call], payload_extension) results.extend(single_result) - + # Record tool execution details execution_time = time.time() - start_time safe_set_attribute(tool_span, "xpander.tool.execution_time", execution_time) - + # Add tool result if available if single_result: result_summary = f"Executed successfully with {len(single_result)} results" safe_set_attribute(tool_span, "xpander.tool.result_summary", result_summary) - + # Store actual result data using existing AgentOps utilities try: result_content = "" - + for i, result_item in enumerate(single_result): # Handle xpander_sdk.ToolCallResult objects specifically - if hasattr(result_item, '__class__') and 'ToolCallResult' in str(type(result_item)): + if hasattr(result_item, "__class__") and "ToolCallResult" in str(type(result_item)): # Extract the actual result content from ToolCallResult try: - if hasattr(result_item, 'result') and result_item.result is not None: + if hasattr(result_item, "result") and result_item.result is not None: actual_result = result_item.result if isinstance(actual_result, str): result_content += actual_result[:1000] + "\n" else: result_content += safe_serialize(actual_result)[:1000] + "\n" - elif hasattr(result_item, 'data') and result_item.data is not None: + elif hasattr(result_item, "data") and result_item.data is not None: result_content += safe_serialize(result_item.data)[:1000] + "\n" else: # Fallback: try to find any content attribute - for attr_name in ['content', 'output', 'value', 'response']: + for attr_name in ["content", "output", "value", "response"]: if hasattr(result_item, attr_name): attr_value = getattr(result_item, attr_name) if attr_value is not None: @@ -733,65 +573,65 @@ def wrapper(self, tool_calls, payload_extension=None): break else: # If no content attributes found, indicate this - result_content += f"ToolCallResult object (no extractable content)\n" + result_content += "ToolCallResult object (no extractable content)\n" except Exception as attr_e: logger.debug(f"Error extracting from ToolCallResult: {attr_e}") - result_content += f"ToolCallResult object (extraction failed)\n" - + result_content += "ToolCallResult object (extraction failed)\n" + # Handle regular objects and primitives elif isinstance(result_item, (str, int, float, bool)): result_content += str(result_item)[:1000] + "\n" - elif hasattr(result_item, '__dict__'): + elif hasattr(result_item, "__dict__"): # Convert objects to dict using existing utility result_dict = model_to_dict(result_item) result_content += safe_serialize(result_dict)[:1000] + "\n" else: # Use safe_serialize for consistent conversion result_content += safe_serialize(result_item)[:1000] + "\n" - + if result_content.strip(): final_content = result_content.strip()[:2000] safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, final_content) else: - safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, "No extractable content found") - + safe_set_attribute( + tool_span, ToolAttributes.TOOL_RESULT, "No extractable content found" + ) + except Exception as e: logger.error(f"Error setting tool result: {e}") - safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, f"Error capturing result: {e}") + safe_set_attribute( + tool_span, ToolAttributes.TOOL_RESULT, f"Error capturing result: {e}" + ) else: safe_set_attribute(tool_span, "xpander.tool.result_summary", "No results returned") - + # If conversation is finished, mark for session closure if conversation_finished: # Since session span is now the conversation span, we need to close all spans # when the conversation finishes - logger.info(f"Conversation finished for session {session_id} - marking for closure") - + pass # Session closure will be handled in retrieve_execution_result + return results - + return wrapper def _wrap_add_messages(self, original_method): """Wrap add_messages to create LLM spans with proper parent-child relationship.""" instrumentor = self + def wrapper(self, messages): - session_id = instrumentor._get_session_id_from_agent(self) current_session = instrumentor._context.get_session(session_id) current_phase = instrumentor._context.get_workflow_phase(session_id) workflow_span = instrumentor._context.get_workflow_span(session_id) - conversation_span = instrumentor._context.get_conversation_span(session_id) - - # Extract and clean message content - message_content = extract_current_message_content(messages) - + # Create LLM span as child of workflow span (not conversation span) # The hierarchy should be: session -> agent/workflow -> LLM -> execution -> tools llm_span_context = trace.set_span_in_context(workflow_span) if workflow_span else None - + # Call original method first to get the actual OpenAI response result = original_method(self, messages) - + # Now create a span that captures the LLM interaction with the actual response data with instrumentor._tracer.start_as_current_span( f"llm.{current_phase}", @@ -804,154 +644,158 @@ def wrapper(self, messages): "xpander.session.id": session_id, }, ) as llm_span: - # Extract and set OpenAI message data from the messages and response instrumentor._extract_and_set_openai_message_attributes(llm_span, messages, result, self) - - # Set cleaned content for legacy compatibility - if message_content: - cleaned_content = clean_llm_content(message_content) - safe_set_attribute(llm_span, "xpander.llm.content", cleaned_content) - + # Extract and set LLM metadata from the result if possible llm_data = instrumentor._extract_llm_data_from_messages(result if result else messages) if llm_data: - if 'model' in llm_data: - safe_set_attribute(llm_span, SpanAttributes.LLM_REQUEST_MODEL, llm_data['model']) - safe_set_attribute(llm_span, SpanAttributes.LLM_RESPONSE_MODEL, llm_data['model']) - - if 'usage' in llm_data: - usage = llm_data['usage'] - if 'prompt_tokens' in usage: - safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage['prompt_tokens']) - if 'completion_tokens' in usage: - safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage['completion_tokens']) - if 'total_tokens' in usage: - safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage['total_tokens']) + if "model" in llm_data: + safe_set_attribute(llm_span, SpanAttributes.LLM_REQUEST_MODEL, llm_data["model"]) + safe_set_attribute(llm_span, SpanAttributes.LLM_RESPONSE_MODEL, llm_data["model"]) + + if "usage" in llm_data: + usage = llm_data["usage"] + if "prompt_tokens" in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage["prompt_tokens"]) + if "completion_tokens" in usage: + safe_set_attribute( + llm_span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage["completion_tokens"] + ) + if "total_tokens" in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage["total_tokens"]) # Update workflow state - instrumentor._context.update_session(session_id, { - "total_tokens": (current_session.get("total_tokens", 0) if current_session else 0) + usage['total_tokens'] - }) - + instrumentor._context.update_session( + session_id, + { + "total_tokens": (current_session.get("total_tokens", 0) if current_session else 0) + + usage["total_tokens"] + }, + ) + return result - + return wrapper def _wrap_is_finished(self, original_method): """Wrap is_finished to track workflow completion.""" instrumentor = self + def wrapper(self): result = original_method(self) - + if result: session_id = instrumentor._get_session_id_from_agent(self) - + # Update session to finished state - instrumentor._context.update_session(session_id, { - "phase": "finished", - "end_time": time.time() - }) - + instrumentor._context.update_session(session_id, {"phase": "finished", "end_time": time.time()}) + return result - + return wrapper def _wrap_extract_tool_calls(self, original_method): """Wrap extract_tool_calls to track tool planning.""" - instrumentor = self + def wrapper(self, messages): result = original_method(self, messages) - - if result: - session_id = instrumentor._get_session_id_from_agent(self) - return result - + return wrapper def _wrap_report_execution_metrics(self, original_method): """Wrap report_execution_metrics to track metrics.""" - instrumentor = self + def wrapper(self, llm_tokens=None, ai_model=None): result = original_method(self, llm_tokens, ai_model) - - session_id = instrumentor._get_session_id_from_agent(self) - return result - + return wrapper def _wrap_retrieve_execution_result(self, original_method): """Wrap retrieve_execution_result to finalize agent and workflow spans.""" instrumentor = self + def wrapper(self): session_id = instrumentor._get_session_id_from_agent(self) current_session = instrumentor._context.get_session(session_id) workflow_span = instrumentor._context.get_workflow_span(session_id) session_span = instrumentor._context.get_conversation_span(session_id) # This is now the root session span - + try: # Execute and capture result result = original_method(self) - + # Add workflow summary to the persistent workflow span if workflow_span and current_session: - safe_set_attribute(workflow_span, "xpander.workflow.total_steps", current_session.get('step_count', 0)) - safe_set_attribute(workflow_span, "xpander.workflow.total_tokens", current_session.get('total_tokens', 0)) - safe_set_attribute(workflow_span, "xpander.workflow.tools_used", len(current_session.get('tools_executed', []))) - + safe_set_attribute( + workflow_span, "xpander.workflow.total_steps", current_session.get("step_count", 0) + ) + safe_set_attribute( + workflow_span, "xpander.workflow.total_tokens", current_session.get("total_tokens", 0) + ) + safe_set_attribute( + workflow_span, "xpander.workflow.tools_used", len(current_session.get("tools_executed", [])) + ) + # Calculate total execution time - start_time = current_session.get('start_time', time.time()) + start_time = current_session.get("start_time", time.time()) execution_time = time.time() - start_time safe_set_attribute(workflow_span, "xpander.workflow.execution_time", execution_time) safe_set_attribute(workflow_span, "xpander.workflow.phase", "completed") - + # Set result details on session and workflow spans if result: result_content = "" - if hasattr(result, 'result'): + if hasattr(result, "result"): result_content = str(result.result)[:1000] - + # Set on session span (root span) if session_span and result_content: safe_set_attribute(session_span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, result_content) safe_set_attribute(session_span, "xpander.session.final_result", result_content) - if hasattr(result, 'memory_thread_id'): + if hasattr(result, "memory_thread_id"): safe_set_attribute(session_span, "xpander.session.thread_id", result.memory_thread_id) - + if workflow_span: if result_content: safe_set_attribute(workflow_span, "xpander.result.content", result_content) - if hasattr(result, 'memory_thread_id'): + if hasattr(result, "memory_thread_id"): safe_set_attribute(workflow_span, "xpander.result.thread_id", result.memory_thread_id) - + # Add session summary to session span if session_span and current_session: - safe_set_attribute(session_span, "xpander.session.total_steps", current_session.get('step_count', 0)) - safe_set_attribute(session_span, "xpander.session.total_tokens", current_session.get('total_tokens', 0)) - safe_set_attribute(session_span, "xpander.session.tools_used", len(current_session.get('tools_executed', []))) - - start_time = current_session.get('start_time', time.time()) + safe_set_attribute( + session_span, "xpander.session.total_steps", current_session.get("step_count", 0) + ) + safe_set_attribute( + session_span, "xpander.session.total_tokens", current_session.get("total_tokens", 0) + ) + safe_set_attribute( + session_span, "xpander.session.tools_used", len(current_session.get("tools_executed", [])) + ) + + start_time = current_session.get("start_time", time.time()) execution_time = time.time() - start_time safe_set_attribute(session_span, "xpander.session.execution_time", execution_time) - + # Close all spans - session span should be closed last if workflow_span: _finish_span_success(workflow_span) workflow_span.end() - + if session_span: _finish_span_success(session_span) session_span.end() - + return result - + except Exception as e: # Mark spans as failed and close them in proper order if workflow_span: _finish_span_error(workflow_span, e) workflow_span.end() - + if session_span: _finish_span_error(session_span, e) session_span.end() @@ -959,7 +803,7 @@ def wrapper(self): finally: # Clean up session instrumentor._context.end_session(session_id) - + return wrapper def _instrument(self, **kwargs): @@ -967,11 +811,11 @@ def _instrument(self, **kwargs): try: # Import xpander modules from xpander_sdk import Agent - + # Set up tracing using existing AgentOps tracer self._tracer = tracer.get_tracer() # Attribute manager already initialized in __init__ - + # Wrap Agent methods Agent.add_task = self._wrap_init_task(Agent.add_task) Agent.init_task = self._wrap_init_task(Agent.init_task) # Also wrap init_task for completeness @@ -981,9 +825,7 @@ def _instrument(self, **kwargs): Agent.extract_tool_calls = self._wrap_extract_tool_calls(Agent.extract_tool_calls) Agent.report_execution_metrics = self._wrap_report_execution_metrics(Agent.report_execution_metrics) Agent.retrieve_execution_result = self._wrap_retrieve_execution_result(Agent.retrieve_execution_result) - - logger.info("Xpander SDK instrumentation activated") - + except ImportError: logger.debug("Xpander SDK not available") except Exception as e: @@ -991,7 +833,7 @@ def _instrument(self, **kwargs): def _uninstrument(self, **kwargs): """Uninstrument the Xpander SDK.""" - logger.info("Xpander SDK instrumentation deactivated") + pass def _create_metrics(self, meter: Meter) -> StandardMetrics: """Create metrics for Xpander instrumentation.""" @@ -1014,36 +856,3 @@ def _create_metrics(self, meter: Meter) -> StandardMetrics: description="Number of Xpander request errors", ), ) - - -def create_xpander_llm_span(tracer, model_name, purpose, session_id, attribute_manager=None): - """Create a standardized LLM span for Xpander operations.""" - span = tracer.start_span( - f"xpander.llm.{purpose}", - kind=OTelSpanKind.CLIENT, - attributes={ - SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.LLM, - SpanAttributes.LLM_REQUEST_MODEL: model_name, - SpanAttributes.LLM_RESPONSE_MODEL: model_name, - "xpander.span.type": "llm", - "xpander.llm.purpose": purpose, - "xpander.session.id": session_id, - } - ) - if attribute_manager: - attribute_manager.set_common_attributes(span) - return span - - -# Backward compatibility functions -def wrap_openai_call_for_xpander(openai_call_func, purpose="general"): - """Backward compatibility stub - functionality now handled by auto-instrumentation.""" - return openai_call_func - -def is_xpander_session_active(): - """Check if xpander session is active.""" - return True # Always return True since auto-instrumentation is active - -def get_active_xpander_session(): - """Get active xpander session.""" - return None # Return None since we don't expose internal context diff --git a/agentops/instrumentation/agentic/xpander/trace_probe.py b/agentops/instrumentation/agentic/xpander/trace_probe.py index d2f6b408b..339352fda 100644 --- a/agentops/instrumentation/agentic/xpander/trace_probe.py +++ b/agentops/instrumentation/agentic/xpander/trace_probe.py @@ -13,10 +13,11 @@ # Global instrumentor instance _instrumentor = None + def activate_xpander_instrumentation(): """Activate Xpander instrumentation.""" global _instrumentor - + if _instrumentor is None: try: _instrumentor = XpanderInstrumentor() @@ -25,13 +26,14 @@ def activate_xpander_instrumentation(): except Exception as e: logger.error(f"Failed to activate Xpander instrumentation: {e}") _instrumentor = None - + return _instrumentor + def deactivate_xpander_instrumentation(): """Deactivate Xpander instrumentation.""" global _instrumentor - + if _instrumentor is not None: try: _instrumentor.uninstrument() @@ -41,36 +43,44 @@ def deactivate_xpander_instrumentation(): finally: _instrumentor = None + def get_instrumentor(): """Get the active instrumentor instance.""" return _instrumentor + # Stub functions for backward compatibility def wrap_openai_call_for_xpander(openai_call_func, purpose="general"): """Backward compatibility stub - functionality now handled by auto-instrumentation.""" logger.debug(f"wrap_openai_call_for_xpander called with purpose: {purpose}") return openai_call_func + def is_xpander_session_active(): """Check if xpander session is active.""" return _instrumentor is not None + def get_active_xpander_session(): """Get active xpander session.""" return _instrumentor._context if _instrumentor else None + # Convenience functions for cleaner OpenAI integration def wrap_openai_analysis(openai_call_func): """Wrap OpenAI calls for analysis/reasoning steps.""" return wrap_openai_call_for_xpander(openai_call_func, "analysis") + def wrap_openai_planning(openai_call_func): """Wrap OpenAI calls for planning steps.""" return wrap_openai_call_for_xpander(openai_call_func, "planning") + def wrap_openai_synthesis(openai_call_func): """Wrap OpenAI calls for synthesis/summary steps.""" return wrap_openai_call_for_xpander(openai_call_func, "synthesis") + # Note: Auto-activation is now handled by the main AgentOps instrumentation system -# activate_xpander_instrumentation() \ No newline at end of file +# activate_xpander_instrumentation() diff --git a/agentops/instrumentation/agentic/xpander/version.py b/agentops/instrumentation/agentic/xpander/version.py index acc10fa75..60d516f2f 100644 --- a/agentops/instrumentation/agentic/xpander/version.py +++ b/agentops/instrumentation/agentic/xpander/version.py @@ -1,3 +1,3 @@ """Version information for xpander instrumentation.""" -__version__ = "1.0.0" \ No newline at end of file +__version__ = "1.0.0" From a3851da8ca2d453871c223555c86a1a798285b51 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Mon, 21 Jul 2025 14:21:38 -0700 Subject: [PATCH 5/7] update instrumentor.py --- .../agentic/xpander/instrumentor.py | 27 ++- examples/xpander/coding_agent.py | 179 ++++++++++++++++++ 2 files changed, 202 insertions(+), 4 deletions(-) create mode 100644 examples/xpander/coding_agent.py diff --git a/agentops/instrumentation/agentic/xpander/instrumentor.py b/agentops/instrumentation/agentic/xpander/instrumentor.py index 9a25be3a5..2680eb304 100644 --- a/agentops/instrumentation/agentic/xpander/instrumentor.py +++ b/agentops/instrumentation/agentic/xpander/instrumentor.py @@ -376,10 +376,21 @@ def _extract_and_set_openai_message_attributes(self, span, messages, result, age logger.error(f"Error extracting OpenAI message attributes: {e}") def _wrap_init_task(self, original_method): - """Wrap init_task to create agent span hierarchy.""" + """Wrap init_task and add_task to create agent span hierarchy.""" instrumentor = self - def wrapper(self, execution): + def wrapper(self, execution=None, input=None, **kwargs): + # Normalize parameters - handle both add_task(input=...) and init_task(execution=...) + if execution is None and input is not None: + # add_task call with input parameter - normalize to execution format + if isinstance(input, str): + execution = {"input": {"text": input}} + else: + execution = {"input": input} + elif execution is None: + # Neither execution nor input provided - create empty execution + execution = {} + # Extract session ID and agent info session_id = instrumentor._extract_session_id(execution) agent_name = getattr(self, "name", "unknown") @@ -389,7 +400,11 @@ def wrapper(self, execution): existing_session = instrumentor._context.get_session(session_id) if existing_session: # Session already exists, just continue - result = original_method(self, execution) + # Call with original parameters + if input is not None: + result = original_method(self, input=input, **kwargs) + else: + result = original_method(self, execution) return result # Extract task input @@ -454,7 +469,11 @@ def wrapper(self, execution): try: # Execute original method - don't end agent span here, it will be ended in retrieve_execution_result - result = original_method(self, execution) + # Call with original parameters + if input is not None: + result = original_method(self, input=input, **kwargs) + else: + result = original_method(self, execution) return result except Exception as e: # Use existing AgentOps error handling utilities diff --git a/examples/xpander/coding_agent.py b/examples/xpander/coding_agent.py new file mode 100644 index 000000000..aa93a7185 --- /dev/null +++ b/examples/xpander/coding_agent.py @@ -0,0 +1,179 @@ +""" +Copyright (c) 2025 Xpander, Inc. All rights reserved. +Modified to use AgentOps callback handlers for tool instrumentation. +Single-file implementation combining MyAgent and XpanderEventListener. +""" + +import asyncio +import json +import os +import sys +import time +from pathlib import Path +from dotenv import load_dotenv +from loguru import logger + +load_dotenv() + +import agentops + +print("🔧 Initializing AgentOps...") +agentops.init( + api_key=os.getenv("AGENTOPS_API_KEY"), + trace_name="my-xpander-coding-agent-callbacks", + default_tags=["xpander", "coding-agent", "callbacks"] +) +print("✅ AgentOps initialized") + +print("📦 Importing xpander_sdk...") +from xpander_sdk import XpanderClient, LLMProvider, LLMTokens, Tokens, Agent +from xpander_utils.events import XpanderEventListener, AgentExecutionResult, AgentExecution, ExecutionStatus +from openai import AsyncOpenAI + +# Simple logger setup +logger.remove() +logger.add(sys.stderr, format="{time:HH:mm:ss} | {message}", level="INFO") + + +class MyAgent: + def __init__(self): + logger.info("🚀 Initializing MyAgent...") + + # Load config + config_path = Path(__file__).parent / "xpander_config.json" + config = json.loads(config_path.read_text()) + + # Get API keys + xpander_key = config.get("api_key") or os.getenv("XPANDER_API_KEY") + agent_id = config.get("agent_id") or os.getenv("XPANDER_AGENT_ID") + openai_key = os.getenv("OPENAI_API_KEY") + + if not all([xpander_key, agent_id, openai_key]): + raise ValueError("Missing required API keys") + + # Initialize + self.openai = AsyncOpenAI(api_key=openai_key) + xpander_client = XpanderClient(api_key=xpander_key) + self.agent_backend: Agent = xpander_client.agents.get(agent_id=agent_id) + self.agent_backend.select_llm_provider(LLMProvider.OPEN_AI) + + logger.info(f"Agent: {self.agent_backend.name}") + logger.info(f"Tools: {len(self.agent_backend.tools)} available") + logger.info("✅ Ready!") + + async def run(self, user_txt_input: str) -> dict: + step = 0 + start_time = time.perf_counter() + tokens = Tokens(worker=LLMTokens(0, 0, 0)) + try: + while not self.agent_backend.is_finished(): + step += 1 + logger.info(f"Step {step} - Calling LLM...") + response = await self.openai.chat.completions.create( + model="gpt-4.1", + messages=self.agent_backend.messages, + tools=self.agent_backend.get_tools(), + tool_choice=self.agent_backend.tool_choice, + temperature=0 + ) + if hasattr(response, 'usage'): + tokens.worker.prompt_tokens += response.usage.prompt_tokens + tokens.worker.completion_tokens += response.usage.completion_tokens + tokens.worker.total_tokens += response.usage.total_tokens + + self.agent_backend.add_messages(response.model_dump()) + self.agent_backend.report_execution_metrics(llm_tokens=tokens, ai_model="gpt-4.1") + tool_calls = self.agent_backend.extract_tool_calls(response.model_dump()) + + if tool_calls: + logger.info(f"Executing {len(tool_calls)} tools...") + tool_results = await asyncio.to_thread(self.agent_backend.run_tools, tool_calls) + for res in tool_results: + emoji = "✅" if res.is_success else "❌" + logger.info(f"Tool result: {emoji} {res.function_name}") + + duration = time.perf_counter() - start_time + logger.info(f"Done! Duration: {duration:.1f}s | Total tokens: {tokens.worker.total_tokens}") + result = self.agent_backend.retrieve_execution_result() + return {"result": result.result, "thread_id": result.memory_thread_id} + except Exception as e: + logger.error(f"Exception: {e}") + raise + + +# === Load Configuration === +logger.info("[xpander_handler] Loading xpander_config.json") +config_path = Path(__file__).parent / "xpander_config.json" +with open(config_path, 'r') as config_file: + xpander_config: dict = json.load(config_file) +logger.info(f"[xpander_handler] Loaded config: {xpander_config}") + +# === Initialize Event Listener === +logger.info(f"[xpander_handler] Initializing XpanderEventListener with config: {xpander_config}") +listener = XpanderEventListener(**xpander_config) +logger.info(f"[xpander_handler] Listener initialized: {listener}") + + +# === Define Execution Handler === +async def on_execution_request(execution_task: AgentExecution) -> AgentExecutionResult: + logger.info(f"[on_execution_request] Called with execution_task: {execution_task}") + my_agent = MyAgent() + logger.info(f"[on_execution_request] Instantiated MyAgent: {my_agent}") + + user_info = "" + user = getattr(execution_task.input, "user", None) + + if user: + name = f"{user.first_name} {user.last_name}".strip() + email = getattr(user, "email", "") + user_info = f"👤 From user: {name}\n📧 Email: {email}" + + IncomingEvent = ( + f"\n📨 Incoming message: {execution_task.input.text}\n" + f"{user_info}" + ) + + logger.info(f"[on_execution_request] IncomingEvent: {IncomingEvent}") + logger.info(f"[on_execution_request] Calling agent_backend.init_task with execution={execution_task.model_dump()}") + my_agent.agent_backend.init_task(execution=execution_task.model_dump()) + + # extract just the text input for quick start purpose. for more robust use the object + user_txt_input = execution_task.input.text + logger.info(f"[on_execution_request] Running agent with user_txt_input: {user_txt_input}") + try: + await my_agent.run(user_txt_input) + logger.info(f"[on_execution_request] Agent run completed") + execution_result = my_agent.agent_backend.retrieve_execution_result() + logger.info(f"[on_execution_request] Execution result: {execution_result}") + result_obj = AgentExecutionResult( + result=execution_result.result, + is_success=execution_result.status == ExecutionStatus.COMPLETED, + ) + logger.info(f"[on_execution_request] Returning AgentExecutionResult: {result_obj}") + return result_obj + except Exception as e: + logger.error(f"[on_execution_request] Exception: {e}") + raise + finally: + logger.info("[on_execution_request] Exiting handler") + + +# === Register Callback === +logger.info("[xpander_handler] Registering on_execution_request callback") +listener.register(on_execution_request=on_execution_request) +logger.info("[xpander_handler] Callback registered") + + +# Example usage for direct interaction +if __name__ == "__main__": + async def main(): + agent = MyAgent() + while True: + task = input("\nAsk Anything (Type exit to end) \nInput: ") + if task.lower() == "exit": + break + agent.agent_backend.add_task(input=task) + result = await agent.run(task) + print(f"\nResult: {result['result']}") + + asyncio.run(main()) \ No newline at end of file From 0ef6957b8b339ace56421ada1f9ced563c81ac46 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Mon, 21 Jul 2025 14:25:34 -0700 Subject: [PATCH 6/7] fix ruff import err --- .../instrumentation/agentic/xpander/context.py | 18 +++++++++--------- .../agentic/xpander/instrumentor.py | 2 +- examples/xpander/coding_agent.py | 1 + 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/agentops/instrumentation/agentic/xpander/context.py b/agentops/instrumentation/agentic/xpander/context.py index 5dc31d53d..abc9583c6 100644 --- a/agentops/instrumentation/agentic/xpander/context.py +++ b/agentops/instrumentation/agentic/xpander/context.py @@ -33,7 +33,7 @@ def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_sp self._workflow_spans[session_id] = workflow_span if agent_span: self._agent_spans[session_id] = agent_span - + # Initialize conversation counter self._conversation_counters[session_id] = 0 @@ -48,7 +48,7 @@ def end_conversation(self, session_id: str) -> None: with self._lock: if session_id in self._conversation_spans: del self._conversation_spans[session_id] - + def has_active_conversation(self, session_id: str) -> bool: """Check if there's an active conversation for this session.""" with self._lock: @@ -88,25 +88,25 @@ def get_workflow_phase(self, session_id: str) -> str: """Detect current workflow phase based on state.""" with self._lock: session = self._sessions.get(session_id, {}) - - if session.get('tools_executed', []): + + if session.get("tools_executed", []): return "executing" - elif session.get('step_count', 0) > 0: + elif session.get("step_count", 0) > 0: return "executing" else: return "planning" - + def get_workflow_span(self, session_id: str): """Get the active workflow span for a session.""" with self._lock: return self._workflow_spans.get(session_id) - + def get_agent_span(self, session_id: str): """Get the active agent span for a session.""" with self._lock: return self._agent_spans.get(session_id) - + def get_conversation_span(self, session_id: str): """Get the active conversation span for a session.""" with self._lock: - return self._conversation_spans.get(session_id) \ No newline at end of file + return self._conversation_spans.get(session_id) diff --git a/agentops/instrumentation/agentic/xpander/instrumentor.py b/agentops/instrumentation/agentic/xpander/instrumentor.py index 2680eb304..05d66c6a8 100644 --- a/agentops/instrumentation/agentic/xpander/instrumentor.py +++ b/agentops/instrumentation/agentic/xpander/instrumentor.py @@ -390,7 +390,7 @@ def wrapper(self, execution=None, input=None, **kwargs): elif execution is None: # Neither execution nor input provided - create empty execution execution = {} - + # Extract session ID and agent info session_id = instrumentor._extract_session_id(execution) agent_name = getattr(self, "name", "unknown") diff --git a/examples/xpander/coding_agent.py b/examples/xpander/coding_agent.py index aa93a7185..635b36a39 100644 --- a/examples/xpander/coding_agent.py +++ b/examples/xpander/coding_agent.py @@ -3,6 +3,7 @@ Modified to use AgentOps callback handlers for tool instrumentation. Single-file implementation combining MyAgent and XpanderEventListener. """ +# ruff: noqa: E402 import asyncio import json From bca74519ecd69e537cb652e8fc1d003c3cfc8930 Mon Sep 17 00:00:00 2001 From: Sri Laasya Nutheti Date: Mon, 21 Jul 2025 14:34:17 -0700 Subject: [PATCH 7/7] Update Xpander documentation and example --- docs/v2/examples/xpander.mdx | 30 +++++- docs/v2/integrations/xpander.mdx | 157 ++++++++++++++++++++-------- examples/xpander/coding_agent.ipynb | 1 - examples/xpander/coding_agent.py | 26 +++-- 4 files changed, 155 insertions(+), 59 deletions(-) delete mode 100644 examples/xpander/coding_agent.ipynb diff --git a/docs/v2/examples/xpander.mdx b/docs/v2/examples/xpander.mdx index e1034688b..62d76219a 100644 --- a/docs/v2/examples/xpander.mdx +++ b/docs/v2/examples/xpander.mdx @@ -1,12 +1,36 @@ --- title: 'Xpander' -description: 'Xpander example using AgentOps' +description: 'Xpander coding agent example with AgentOps' --- -{/* SOURCE_FILE: examples/xpander/coding_agent.ipynb */} +{/* SOURCE_FILE: examples/xpander/coding_agent.py */} -_View Notebook on Github_ +_View Python Example on Github_ +This example demonstrates a complete Xpander coding agent implementation with AgentOps instrumentation using callback handlers. The example shows how to build a single-file agent that combines the MyAgent class and XpanderEventListener for comprehensive monitoring. +## Key Features + +- **Single-file implementation** combining agent logic and event handling +- **Callback-based instrumentation** using XpanderEventListener +- **Async tool execution** with proper error handling and logging +- **AgentOps integration** with custom trace names and tags + +## Running the Example + +1. Set up your environment variables in a `.env` file or export them: + ```bash + AGENTOPS_API_KEY=your_agentops_api_key + XPANDER_API_KEY=your_xpander_api_key + XPANDER_AGENT_ID=your_agent_id + OPENAI_API_KEY=your_openai_api_key + ``` + +2. Run the agent: + ```bash + python examples/xpander/coding_agent.py + ``` + +The agent will start an interactive session where you can ask coding questions and see the results tracked in your AgentOps dashboard. diff --git a/docs/v2/integrations/xpander.mdx b/docs/v2/integrations/xpander.mdx index 9c478b85a..5407e5b5f 100644 --- a/docs/v2/integrations/xpander.mdx +++ b/docs/v2/integrations/xpander.mdx @@ -11,13 +11,13 @@ Install AgentOps and the Xpander SDK, along with the required dependencies: ```bash pip - pip install agentops xpander-sdk openai python-dotenv + pip install agentops xpander-sdk xpander-utils openai python-dotenv loguru ``` ```bash poetry - poetry add agentops xpander-sdk openai python-dotenv + poetry add agentops xpander-sdk xpander-utils openai python-dotenv loguru ``` ```bash uv - uv add agentops xpander-sdk openai python-dotenv + uv add agentops xpander-sdk xpander-utils openai python-dotenv loguru ``` @@ -46,58 +46,118 @@ Set these as environment variables or in a `.env` file: ``` +You can also store your configuration in a `xpander_config.json` file: + +```json +{ + "api_key": "your_xpander_api_key_here", + "agent_id": "your_xpander_agent_id_here" +} +``` + ## Quick Start The key to AgentOps + Xpander integration is **initialization order**: Initialize AgentOps **before** importing the Xpander SDK to enable automatic instrumentation. + +The following example shows the callback-based integration pattern. For a complete working example, see our [Xpander example](/v2/examples/xpander). + + ```python +# ruff: noqa: E402 import os +import json +import asyncio +from pathlib import Path from dotenv import load_dotenv -# Load environment variables +# Load environment variables first load_dotenv() # 1. Initialize AgentOps FIRST (this enables auto-instrumentation) import agentops agentops.init( api_key=os.getenv("AGENTOPS_API_KEY"), - default_tags=["xpander", "production"] + trace_name="my-xpander-coding-agent-callbacks", + default_tags=["xpander", "coding-agent", "callbacks"], ) # 2. Now import Xpander SDK (instrumentation will automatically activate) -from xpander_sdk import XpanderClient, LLMProvider -from openai import OpenAI - -# 3. Set up your agent -xpander_client = XpanderClient(api_key=os.getenv("XPANDER_API_KEY")) -agent = xpander_client.agents.get(agent_id=os.getenv("XPANDER_AGENT_ID")) -agent.select_llm_provider(LLMProvider.OPEN_AI) - -openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - -# 4. Execute a task (automatically instrumented!) -execution = agent.add_task(input="What is the weather like today?") - -while not agent.is_finished(): - # Call LLM - response = openai_client.chat.completions.create( - model="gpt-4o-mini", - messages=agent.messages, - tools=agent.get_tools(), - tool_choice=agent.tool_choice - ) - - # Add response to agent context - agent.add_messages(response.model_dump()) +from xpander_sdk import XpanderClient, LLMProvider, LLMTokens, Tokens, Agent, ExecutionStatus +from xpander_utils.events import XpanderEventListener, AgentExecutionResult, AgentExecution +from openai import AsyncOpenAI + +class MyAgent: + def __init__(self): + # Load config + config_path = Path(__file__).parent / "xpander_config.json" + config = json.loads(config_path.read_text()) + + # Get API keys + xpander_key = config.get("api_key") or os.getenv("XPANDER_API_KEY") + agent_id = config.get("agent_id") or os.getenv("XPANDER_AGENT_ID") + openai_key = os.getenv("OPENAI_API_KEY") + + # Initialize clients + self.openai = AsyncOpenAI(api_key=openai_key) + xpander_client = XpanderClient(api_key=xpander_key) + self.agent_backend: Agent = xpander_client.agents.get(agent_id=agent_id) + self.agent_backend.select_llm_provider(LLMProvider.OPEN_AI) + + async def run(self, user_input: str) -> dict: + tokens = Tokens(worker=LLMTokens(0, 0, 0)) + + while not self.agent_backend.is_finished(): + # Call LLM + response = await self.openai.chat.completions.create( + model="gpt-4", + messages=self.agent_backend.messages, + tools=self.agent_backend.get_tools(), + tool_choice=self.agent_backend.tool_choice, + temperature=0, + ) + + # Track tokens + if hasattr(response, "usage"): + tokens.worker.prompt_tokens += response.usage.prompt_tokens + tokens.worker.completion_tokens += response.usage.completion_tokens + tokens.worker.total_tokens += response.usage.total_tokens + + # Add response to agent context + self.agent_backend.add_messages(response.model_dump()) + self.agent_backend.report_execution_metrics(llm_tokens=tokens, ai_model="gpt-4") + + # Execute any tool calls + tool_calls = self.agent_backend.extract_tool_calls(response.model_dump()) + if tool_calls: + tool_results = await asyncio.to_thread(self.agent_backend.run_tools, tool_calls) + + result = self.agent_backend.retrieve_execution_result() + return {"result": result.result, "thread_id": result.memory_thread_id} + +# Set up event listener with callback handlers +listener = XpanderEventListener( + api_key=os.getenv("XPANDER_API_KEY"), + agent_id=os.getenv("XPANDER_AGENT_ID") +) + +async def on_execution_request(execution_task: AgentExecution) -> AgentExecutionResult: + agent = MyAgent() + agent.agent_backend.init_task(execution=execution_task.model_dump()) - # Execute any tool calls - tool_calls = agent.extract_tool_calls(response.model_dump()) - if tool_calls: - agent.run_tools(tool_calls) - -# Get final result -result = agent.retrieve_execution_result() -print(f"Task completed: {result.result}") + try: + await agent.run(execution_task.input.text) + execution_result = agent.agent_backend.retrieve_execution_result() + return AgentExecutionResult( + result=execution_result.result, + is_success=execution_result.status == ExecutionStatus.COMPLETED, + ) + except Exception as e: + print(f"Error: {e}") + raise + +# Register the callback +listener.register(on_execution_request=on_execution_request) ``` ## What's Automatically Tracked @@ -145,6 +205,18 @@ Monitor which tools are being called, their parameters, execution time, and resu ### ✅ Cost Tracking Automatic token usage tracking for all LLM interactions with cost analysis. +## Callback Handler Pattern + +The Xpander integration supports two main patterns: + +1. **Direct Integration**: Directly instrument your agent code (shown above) +2. **Callback Handler**: Use XpanderEventListener for webhook-style integration + +The callback handler pattern is particularly useful for: +- Production deployments with centralized monitoring +- Multi-agent orchestration systems +- Event-driven architectures + ## Runtime-Specific Instrumentation Xpander SDK uses JSII to create methods at runtime, which requires specialized instrumentation. AgentOps handles this automatically by: @@ -174,14 +246,17 @@ agentops.init() # Too late - instrumentation won't activate ### Missing Tool Results If tool results show `{"__jsii_ref__": "..."}` instead of actual content, ensure you're using the latest version of AgentOps, which includes improved JSII object handling. +### Import Errors (E402) +If you see linting errors about imports not being at the top of the file, this is expected for Xpander integration. Add `# ruff: noqa: E402` at the top of your file to suppress these warnings, as the import order is required for proper instrumentation. + ## Examples - - Complete example of a Xpander agent with automatic AgentOps instrumentation + + Complete single-file implementation with callback handlers - - Demonstrate complex workflows with multiple tool executions + + View the complete source code and configuration files diff --git a/examples/xpander/coding_agent.ipynb b/examples/xpander/coding_agent.ipynb deleted file mode 100644 index 3b5981a28..000000000 --- a/examples/xpander/coding_agent.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[],"metadata":{"kernelspec":{"display_name":"Python 3","language":"python","name":"python3"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.9.0"}},"nbformat":4,"nbformat_minor":4} diff --git a/examples/xpander/coding_agent.py b/examples/xpander/coding_agent.py index 635b36a39..77bf72d03 100644 --- a/examples/xpander/coding_agent.py +++ b/examples/xpander/coding_agent.py @@ -22,7 +22,7 @@ agentops.init( api_key=os.getenv("AGENTOPS_API_KEY"), trace_name="my-xpander-coding-agent-callbacks", - default_tags=["xpander", "coding-agent", "callbacks"] + default_tags=["xpander", "coding-agent", "callbacks"], ) print("✅ AgentOps initialized") @@ -75,24 +75,24 @@ async def run(self, user_txt_input: str) -> dict: messages=self.agent_backend.messages, tools=self.agent_backend.get_tools(), tool_choice=self.agent_backend.tool_choice, - temperature=0 + temperature=0, ) - if hasattr(response, 'usage'): + if hasattr(response, "usage"): tokens.worker.prompt_tokens += response.usage.prompt_tokens tokens.worker.completion_tokens += response.usage.completion_tokens tokens.worker.total_tokens += response.usage.total_tokens - + self.agent_backend.add_messages(response.model_dump()) self.agent_backend.report_execution_metrics(llm_tokens=tokens, ai_model="gpt-4.1") tool_calls = self.agent_backend.extract_tool_calls(response.model_dump()) - + if tool_calls: logger.info(f"Executing {len(tool_calls)} tools...") tool_results = await asyncio.to_thread(self.agent_backend.run_tools, tool_calls) for res in tool_results: emoji = "✅" if res.is_success else "❌" logger.info(f"Tool result: {emoji} {res.function_name}") - + duration = time.perf_counter() - start_time logger.info(f"Done! Duration: {duration:.1f}s | Total tokens: {tokens.worker.total_tokens}") result = self.agent_backend.retrieve_execution_result() @@ -105,7 +105,7 @@ async def run(self, user_txt_input: str) -> dict: # === Load Configuration === logger.info("[xpander_handler] Loading xpander_config.json") config_path = Path(__file__).parent / "xpander_config.json" -with open(config_path, 'r') as config_file: +with open(config_path, "r") as config_file: xpander_config: dict = json.load(config_file) logger.info(f"[xpander_handler] Loaded config: {xpander_config}") @@ -129,10 +129,7 @@ async def on_execution_request(execution_task: AgentExecution) -> AgentExecution email = getattr(user, "email", "") user_info = f"👤 From user: {name}\n📧 Email: {email}" - IncomingEvent = ( - f"\n📨 Incoming message: {execution_task.input.text}\n" - f"{user_info}" - ) + IncomingEvent = f"\n📨 Incoming message: {execution_task.input.text}\n" f"{user_info}" logger.info(f"[on_execution_request] IncomingEvent: {IncomingEvent}") logger.info(f"[on_execution_request] Calling agent_backend.init_task with execution={execution_task.model_dump()}") @@ -143,7 +140,7 @@ async def on_execution_request(execution_task: AgentExecution) -> AgentExecution logger.info(f"[on_execution_request] Running agent with user_txt_input: {user_txt_input}") try: await my_agent.run(user_txt_input) - logger.info(f"[on_execution_request] Agent run completed") + logger.info("[on_execution_request] Agent run completed") execution_result = my_agent.agent_backend.retrieve_execution_result() logger.info(f"[on_execution_request] Execution result: {execution_result}") result_obj = AgentExecutionResult( @@ -167,6 +164,7 @@ async def on_execution_request(execution_task: AgentExecution) -> AgentExecution # Example usage for direct interaction if __name__ == "__main__": + async def main(): agent = MyAgent() while True: @@ -176,5 +174,5 @@ async def main(): agent.agent_backend.add_task(input=task) result = await agent.run(task) print(f"\nResult: {result['result']}") - - asyncio.run(main()) \ No newline at end of file + + asyncio.run(main())