diff --git a/examples/weave_observability_demo.py b/examples/weave_observability_demo.py new file mode 100644 index 0000000000..53e3f41fe7 --- /dev/null +++ b/examples/weave_observability_demo.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +"""Demo script showing Weave observability integration with OpenHands SDK. + +This script demonstrates how Weave provides **automatic LLM tracing** for the +OpenHands SDK. The key insight is that Weave automatically patches LiteLLM +when initialized, so all LLM calls are traced without any manual decoration! + +## Key Features Demonstrated + +1. **Automatic LLM Tracing**: Just set environment variables and all LiteLLM calls + are automatically traced - no `@weave.op` decorators needed for LLM calls! + +2. **Custom Function Tracing**: Use `@weave_op` for custom agent logic you + want to trace (tool execution, agent steps, etc.) + +3. **Conversation Threading**: The SDK automatically wraps conversation runs + in `weave.thread()` to group all operations under the conversation ID. + This enables conversation-level tracing in the Weave UI! + +## How It Works + +The SDK uses LiteLLM for all LLM calls. When Weave is initialized: +1. Weave's autopatching automatically patches LiteLLM +2. All `litellm.completion()` and `litellm.acompletion()` calls are traced +3. LocalConversation.run() wraps the event loop in `weave.thread(conversation_id)` +4. You see full conversation traces in the Weave UI without any code changes! + +## Prerequisites + +- Install with Weave support: `pip install openhands-sdk[weave]` +- Set WANDB_API_KEY environment variable +- Set WEAVE_PROJECT environment variable (e.g., "your-team/openhands-demo") + +## Usage + + export WANDB_API_KEY="your-api-key" + export WEAVE_PROJECT="your-team/openhands-demo" + python examples/weave_observability_demo.py + +Note: + If WANDB_API_KEY is not set or the weave package is not installed, + the demo will still run but without Weave tracing. +""" + +import os +import sys + +# Add the SDK to the path for development +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "openhands-sdk")) + +from openhands.sdk.observability import ( + init_weave, + is_weave_initialized, + maybe_init_weave, + weave_op, + get_weave_op, +) + + +# Example 1: Using the @weave_op decorator for custom function tracing +@weave_op(name="process_message") +def process_message(message: str) -> dict: + """Process a user message and return a response. + + When Weave is initialized, this function will appear in traces + with the name "process_message". + """ + word_count = len(message.split()) + return { + "original": message, + "word_count": word_count, + "processed": True, + } + + +# Example 2: Another traced function +@weave_op(name="analyze_sentiment") +def analyze_sentiment(text: str) -> str: + """Analyze the sentiment of text. + + This demonstrates how @weave_op works as a no-op when Weave + is not initialized - your code runs normally either way. + """ + positive_words = {"good", "great", "excellent", "happy", "love"} + negative_words = {"bad", "terrible", "sad", "hate", "awful"} + + words = set(text.lower().split()) + pos_count = len(words & positive_words) + neg_count = len(words & negative_words) + + if pos_count > neg_count: + return "positive" + elif neg_count > pos_count: + return "negative" + return "neutral" + + +# Example 3: Nested traced functions +@weave_op(name="agent_step") +def agent_step(step_num: int, user_input: str) -> dict: + """Simulate an agent step with nested traced operations. + + When this function calls process_message and analyze_sentiment, + they appear as child spans in the Weave trace. + """ + processed = process_message(user_input) + sentiment = analyze_sentiment(user_input) + + return { + "step": step_num, + "processed": processed, + "sentiment": sentiment, + } + + +def run_demo(): + """Run the Weave observability demo.""" + print("=" * 60) + print("Weave Observability Demo for OpenHands SDK") + print("=" * 60) + + # Check environment + api_key = os.environ.get("WANDB_API_KEY") + project = os.environ.get("WEAVE_PROJECT") + + if not api_key: + print("\n⚠️ WANDB_API_KEY not set. Weave tracing will be disabled.") + print(" Set it with: export WANDB_API_KEY='your-api-key'") + + if not project: + print("\n⚠️ WEAVE_PROJECT not set. Using default project name.") + project = "openhands-sdk-demo" + os.environ["WEAVE_PROJECT"] = project + + # Initialize Weave (or use maybe_init_weave() for conditional init) + print(f"\n📊 Initializing Weave for project: {project}") + success = maybe_init_weave() + + if success: + print("✅ Weave initialized successfully!") + print(f" View traces at: https://wandb.ai/{project}/weave") + print("\n 🎉 KEY FEATURES:") + print(" • All LiteLLM calls are AUTOMATICALLY traced (no decoration needed)") + print(" • Conversation.run() automatically groups operations by conversation ID") + print(" • Use @weave_op for custom functions you want to trace") + else: + print("⚠️ Weave not initialized (missing credentials or package)") + print(" Running demo without tracing...") + print(" Install with: pip install openhands-sdk[weave]") + + print("\n" + "-" * 60) + print("Running demo operations...") + print("-" * 60) + + # Demo 1: Simple decorated function + print("\n1️⃣ Custom function tracing with @weave_op decorator:") + print(" (Use this for custom agent logic you want to trace)") + result = process_message("Hello, this is a test message for the agent!") + print(f" Result: {result}") + + # Demo 2: Nested function calls + print("\n2️⃣ Nested traced function calls:") + print(" (Child functions appear as child spans in the trace)") + result = agent_step(1, "This is a great example of tracing!") + print(f" Result: {result}") + + # Demo 3: Multiple steps to show trace structure + print("\n3️⃣ Multiple agent steps:") + for i, msg in enumerate([ + "Hello, I need help with my code", + "The function is not working correctly", + "Great, that fixed it! Thank you!", + ], 1): + result = agent_step(i, msg) + print(f" Step {i}: sentiment={result['sentiment']}") + + # Demo 4: Dynamic decoration with get_weave_op() + print("\n4️⃣ Dynamic decoration with get_weave_op():") + print(" (Useful for conditionally applying tracing)") + op = get_weave_op() + + @op + def dynamically_traced_function(x: int) -> int: + return x * 2 + + result = dynamically_traced_function(21) + print(f" Result: {result}") + + print("\n" + "=" * 60) + print("Demo completed!") + + if is_weave_initialized(): + print(f"\n🔗 View your traces at: https://wandb.ai/{project}/weave") + print("\n💡 Key Integration Points:") + print(" • LLM calls via LiteLLM are traced AUTOMATICALLY") + print(" • Conversation.run() groups all operations by conversation ID") + print(" • Use @weave_op for custom agent logic you want to trace") + print("\n📝 Minimal setup (zero code changes):") + print(" 1. pip install openhands-sdk[weave]") + print(" 2. export WANDB_API_KEY='your-key'") + print(" 3. export WEAVE_PROJECT='team/project'") + print(" That's it! All LLM calls are now traced.") + else: + print("\n📝 To enable tracing:") + print(" 1. pip install openhands-sdk[weave]") + print(" 2. export WANDB_API_KEY='your-api-key'") + print(" 3. export WEAVE_PROJECT='your-team/your-project'") + print(" 4. Run this demo again") + + print("=" * 60) + + +if __name__ == "__main__": + run_demo() diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index d88d2656d4..f3741e6cf8 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -47,6 +47,7 @@ should_enable_observability, ) from openhands.sdk.observability.utils import extract_action_name +from openhands.sdk.observability.weave import maybe_init_weave from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer from openhands.sdk.tool import ( Action, @@ -61,6 +62,7 @@ logger = get_logger(__name__) maybe_init_laminar() +maybe_init_weave() class Agent(AgentBase): @@ -109,17 +111,10 @@ def init_state( event = SystemPromptEvent( source="agent", system_prompt=TextContent(text=self.system_message), - # Always expose a 'security_risk' parameter in tool schemas. - # This ensures the schema remains consistent, even if the - # security analyzer is disabled. Validation of this field - # happens dynamically at runtime depending on the analyzer - # configured. This allows weaker models to omit risk field - # and bypass validation requirements when analyzer is disabled. - # For detailed logic, see `_extract_security_risk` method. - tools=[ - t.to_openai_tool(add_security_risk_prediction=True) - for t in self.tools_map.values() - ], + # Tools are stored as ToolDefinition objects and converted to + # OpenAI format with security_risk parameter during LLM completion. + # See make_llm_completion() in agent/utils.py for details. + tools=list(self.tools_map.values()), ) on_event(event) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index a05aa7b1b8..2194c8a12e 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -34,6 +34,7 @@ from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.llm.llm_registry import LLMRegistry from openhands.sdk.logger import get_logger +from openhands.sdk.observability.context import get_conversation_context from openhands.sdk.observability.laminar import observe from openhands.sdk.security.analyzer import SecurityAnalyzerBase from openhands.sdk.security.confirmation_policy import ( @@ -295,6 +296,13 @@ def run(self) -> None: - Creates and executes actions immediately Can be paused between steps + + Note: + All operations within this run are automatically wrapped in + observability context managers for all enabled tools (Weave, Laminar, + etc.). This groups LLM calls and traced operations under the + conversation ID, enabling conversation-level tracing in observability + UIs. """ with self._state: @@ -306,75 +314,78 @@ def run(self) -> None: self._state.execution_status = ConversationExecutionStatus.RUNNING iteration = 0 - try: - while True: - logger.debug(f"Conversation run iteration {iteration}") - with self._state: - # Pause attempts to acquire the state lock - # Before value can be modified step can be taken - # Ensure step conditions are checked when lock is already acquired - if self._state.execution_status in [ - ConversationExecutionStatus.FINISHED, - ConversationExecutionStatus.PAUSED, - ConversationExecutionStatus.STUCK, - ]: - break - - # Check for stuck patterns if enabled - if self._stuck_detector: - is_stuck = self._stuck_detector.is_stuck() - - if is_stuck: - logger.warning("Stuck pattern detected.") + # Wrap the run loop in observability context managers for all enabled tools. + # This groups all LLM calls and traced operations under the conversation ID. + with get_conversation_context(str(self.id)): + try: + while True: + logger.debug(f"Conversation run iteration {iteration}") + with self._state: + # Pause attempts to acquire the state lock + # Before value can be modified step can be taken + # Ensure step conditions are checked when lock is already acquired + if self._state.execution_status in [ + ConversationExecutionStatus.FINISHED, + ConversationExecutionStatus.PAUSED, + ConversationExecutionStatus.STUCK, + ]: + break + + # Check for stuck patterns if enabled + if self._stuck_detector: + is_stuck = self._stuck_detector.is_stuck() + + if is_stuck: + logger.warning("Stuck pattern detected.") + self._state.execution_status = ( + ConversationExecutionStatus.STUCK + ) + continue + + # clear the flag before calling agent.step() (user approved) + if ( + self._state.execution_status + == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION + ): self._state.execution_status = ( - ConversationExecutionStatus.STUCK + ConversationExecutionStatus.RUNNING ) - continue - - # clear the flag before calling agent.step() (user approved) - if ( - self._state.execution_status - == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION - ): - self._state.execution_status = ( - ConversationExecutionStatus.RUNNING - ) - self.agent.step( - self, on_event=self._on_event, on_token=self._on_token + self.agent.step( + self, on_event=self._on_event, on_token=self._on_token + ) + iteration += 1 + + # Check for non-finished terminal conditions + # Note: We intentionally do NOT check for FINISHED status here. + # This allows concurrent user messages to be processed: + # 1. Agent finishes and sets status to FINISHED + # 2. User sends message concurrently via send_message() + # 3. send_message() waits for FIFO lock, then sets status to IDLE + # 4. Run loop continues to next iteration and processes the message + # 5. Without this design, concurrent messages would be lost + if ( + self.state.execution_status + == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION + or iteration >= self.max_iteration_per_run + ): + break + except Exception as e: + self._state.execution_status = ConversationExecutionStatus.ERROR + + # Add an error event + self._on_event( + ConversationErrorEvent( + source="environment", + code=e.__class__.__name__, + detail=str(e), ) - iteration += 1 - - # Check for non-finished terminal conditions - # Note: We intentionally do NOT check for FINISHED status here. - # This allows concurrent user messages to be processed: - # 1. Agent finishes and sets status to FINISHED - # 2. User sends message concurrently via send_message() - # 3. send_message() waits for FIFO lock, then sets status to IDLE - # 4. Run loop continues to next iteration and processes the message - # 5. Without this design, concurrent messages would be lost - if ( - self.state.execution_status - == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION - or iteration >= self.max_iteration_per_run - ): - break - except Exception as e: - self._state.execution_status = ConversationExecutionStatus.ERROR - - # Add an error event - self._on_event( - ConversationErrorEvent( - source="environment", - code=e.__class__.__name__, - detail=str(e), ) - ) - # Re-raise with conversation id and persistence dir for better UX - raise ConversationRunError( - self._state.id, e, persistence_dir=self._state.persistence_dir - ) from e + # Re-raise with conversation id and persistence dir for better UX + raise ConversationRunError( + self._state.id, e, persistence_dir=self._state.persistence_dir + ) from e def set_confirmation_policy(self, policy: ConfirmationPolicyBase) -> None: """Set the confirmation policy and store it in conversation state.""" diff --git a/openhands-sdk/openhands/sdk/observability/__init__.py b/openhands-sdk/openhands/sdk/observability/__init__.py index 4f4ea48583..01b6f95560 100644 --- a/openhands-sdk/openhands/sdk/observability/__init__.py +++ b/openhands-sdk/openhands/sdk/observability/__init__.py @@ -1,4 +1,36 @@ +from openhands.sdk.observability.context import ( + clear_conversation_context_providers, + get_conversation_context, + register_conversation_context_provider, + unregister_conversation_context_provider, +) from openhands.sdk.observability.laminar import maybe_init_laminar, observe +from openhands.sdk.observability.weave import ( + get_weave_client, + get_weave_op, + init_weave, + is_weave_initialized, + maybe_init_weave, + should_enable_weave, + weave_op, +) -__all__ = ["maybe_init_laminar", "observe"] +__all__ = [ + # Generic observability context (unified interface) + "get_conversation_context", + "register_conversation_context_provider", + "unregister_conversation_context_provider", + "clear_conversation_context_providers", + # Laminar exports + "maybe_init_laminar", + "observe", + # Weave exports + "get_weave_client", + "get_weave_op", + "init_weave", + "is_weave_initialized", + "maybe_init_weave", + "should_enable_weave", + "weave_op", +] diff --git a/openhands-sdk/openhands/sdk/observability/context.py b/openhands-sdk/openhands/sdk/observability/context.py new file mode 100644 index 0000000000..c605e0119c --- /dev/null +++ b/openhands-sdk/openhands/sdk/observability/context.py @@ -0,0 +1,225 @@ +"""Generic observability context management for the OpenHands SDK. + +This module provides a unified interface for managing observability contexts +across multiple observability tools (Weave, Laminar, etc.). It allows the SDK +to use a single API that automatically composes context managers from all +enabled observability providers. + +## Design Philosophy + +The SDK should be agnostic to which observability tools are enabled. This module +provides: + +1. **Unified Context Managers**: A single `get_conversation_context()` function + that returns a composed context manager for all enabled tools. + +2. **Provider Registry**: Observability tools register their context providers, + allowing easy extension for new tools. + +3. **Graceful Degradation**: If no observability tools are enabled, the context + managers are no-ops (nullcontext). + +## Usage + +In LocalConversation.run(): +```python +from openhands.sdk.observability.context import get_conversation_context + +def run(self): + with get_conversation_context(str(self.id)): + # All operations here are traced by all enabled observability tools + ... +``` + +## Adding New Observability Providers + +To add a new observability tool: + +1. Create a function that returns a context manager for conversation threading +2. Register it with `register_conversation_context_provider()` + +```python +from openhands.sdk.observability.context import register_conversation_context_provider + +def get_my_tool_context(conversation_id: str): + if not is_my_tool_initialized(): + return nullcontext() + return my_tool.thread(conversation_id) + +register_conversation_context_provider(get_my_tool_context) +``` +""" + +from collections.abc import Callable +from contextlib import ExitStack, contextmanager, nullcontext +from typing import Any, ContextManager, Iterator + +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + + +# Type alias for context provider functions +ConversationContextProvider = Callable[[str], ContextManager[Any]] + +# Registry of conversation context providers +_conversation_context_providers: list[ConversationContextProvider] = [] + + +def register_conversation_context_provider( + provider: ConversationContextProvider, +) -> None: + """Register a conversation context provider. + + Context providers are functions that take a conversation_id and return + a context manager. They are called in order of registration. + + Args: + provider: A function that takes a conversation_id string and returns + a context manager. Should return nullcontext() if the + observability tool is not initialized. + + Example: + ```python + def get_my_tool_context(conversation_id: str): + if not is_my_tool_initialized(): + return nullcontext() + return my_tool.thread(conversation_id) + + register_conversation_context_provider(get_my_tool_context) + ``` + """ + if provider not in _conversation_context_providers: + _conversation_context_providers.append(provider) + logger.debug(f"Registered conversation context provider: {provider.__name__}") + + +def unregister_conversation_context_provider( + provider: ConversationContextProvider, +) -> None: + """Unregister a conversation context provider. + + Args: + provider: The provider function to unregister. + """ + if provider in _conversation_context_providers: + _conversation_context_providers.remove(provider) + logger.debug(f"Unregistered conversation context provider: {provider.__name__}") + + +def clear_conversation_context_providers() -> None: + """Clear all registered conversation context providers. + + Useful for testing or resetting the observability state. + """ + _conversation_context_providers.clear() + logger.debug("Cleared all conversation context providers") + + +@contextmanager +def get_conversation_context(conversation_id: str) -> Iterator[None]: + """Get a composed context manager for all enabled observability tools. + + This function returns a context manager that wraps all registered + observability context providers. When entered, it enters all provider + contexts in order. When exited, it exits them in reverse order. + + If no providers are registered or all providers return nullcontext, + this is effectively a no-op. + + Args: + conversation_id: The conversation ID to use for threading/grouping. + + Yields: + None + + Example: + ```python + with get_conversation_context("conv-123"): + # All operations here are traced by all enabled observability tools + agent.step(...) + ``` + """ + if not _conversation_context_providers: + yield + return + + # Use ExitStack to compose multiple context managers + with ExitStack() as stack: + for provider in _conversation_context_providers: + try: + ctx = provider(conversation_id) + stack.enter_context(ctx) + except Exception as e: + # Log but don't fail - observability should not break the agent + logger.debug( + f"Error entering context from provider {provider.__name__}: {e}" + ) + yield + + +# ============================================================================= +# Built-in Provider Registrations +# ============================================================================= +# These are registered when the module is imported. Each provider checks if +# its tool is initialized before returning a real context manager. + + +def _get_weave_conversation_context(conversation_id: str) -> ContextManager[Any]: + """Weave conversation context provider. + + Returns a weave.thread() context manager if Weave is initialized, + otherwise returns nullcontext(). + """ + try: + from openhands.sdk.observability.weave import is_weave_initialized + + if not is_weave_initialized(): + return nullcontext() + + import weave + return weave.thread(conversation_id) + except ImportError: + return nullcontext() + except Exception: + return nullcontext() + + +def _get_laminar_conversation_context(conversation_id: str) -> ContextManager[Any]: + """Laminar conversation context provider. + + Returns a Laminar span context if Laminar is initialized, + otherwise returns nullcontext(). + + Note: Laminar uses OpenTelemetry spans rather than threads, so we create + a span with the conversation_id as the session_id. + """ + try: + from openhands.sdk.observability.laminar import should_enable_observability + + if not should_enable_observability(): + return nullcontext() + + from lmnr import Laminar + + @contextmanager + def laminar_conversation_context(): + span = Laminar.start_active_span(f"conversation:{conversation_id}") + Laminar.set_trace_session_id(conversation_id) + try: + yield + finally: + if span and span.is_recording(): + span.end() + + return laminar_conversation_context() + except ImportError: + return nullcontext() + except Exception: + return nullcontext() + + +# Register built-in providers +register_conversation_context_provider(_get_weave_conversation_context) +register_conversation_context_provider(_get_laminar_conversation_context) diff --git a/openhands-sdk/openhands/sdk/observability/weave.py b/openhands-sdk/openhands/sdk/observability/weave.py new file mode 100644 index 0000000000..740c42e4da --- /dev/null +++ b/openhands-sdk/openhands/sdk/observability/weave.py @@ -0,0 +1,353 @@ +"""Weave observability integration for OpenHands SDK. + +This module provides integration with Weights & Biases Weave for automatic +tracing and observability of agent operations. It leverages Weave's built-in +autopatching to automatically trace all LLM calls made through LiteLLM. + +## Key Features + +1. **Zero-config LLM tracing**: Just call `init_weave()` and all LiteLLM calls + are automatically traced - no manual decoration needed! + +2. **Automatic integration patching**: Weave automatically patches LiteLLM, + OpenAI, Anthropic, and 30+ other providers when initialized. + +3. **Optional manual tracing**: Use `@weave.op` for custom agent logic that + you want to trace (tool execution, agent steps, etc.) + +4. **Thread grouping**: Use `weave.thread()` to group operations by conversation. + +## How It Works + +The SDK uses LiteLLM for all LLM calls. When you call `init_weave()`: +1. Weave's autopatching automatically patches LiteLLM +2. All `litellm.completion()` and `litellm.acompletion()` calls are traced +3. You see full traces in the Weave UI without any code changes! + +## Environment Variables + +- `WANDB_API_KEY`: Your Weights & Biases API key +- `WEAVE_PROJECT`: The Weave project name (e.g., "my-team/my-project") + +## Usage Examples + +### Basic Usage (Automatic LLM Tracing) + +```python +from openhands.sdk.observability import init_weave +from openhands.sdk import LLM + +# Initialize Weave - this automatically traces all LLM calls! +init_weave("my-team/my-project") + +# All LLM calls are now automatically traced +llm = LLM(model="gpt-4") +response = llm.completion(messages=[{"role": "user", "content": "Hello!"}]) +# ^ This call appears in Weave UI automatically +``` + +### Custom Function Tracing + +```python +import weave +from openhands.sdk.observability import init_weave + +init_weave("my-team/my-project") + +# Use @weave.op for custom logic you want to trace +@weave.op +def process_agent_step(step: dict) -> dict: + # Your custom logic here + return {"processed": True} +``` + +### Conversation Thread Grouping + +```python +import weave +from openhands.sdk.observability import init_weave + +init_weave("my-team/my-project") + +# Group all operations under a conversation thread +with weave.thread("conversation-123"): + # All LLM calls and traced functions within this block + # will be grouped under the same thread + response = llm.completion(...) +``` + +See Also: + - Weave documentation: https://docs.wandb.ai/weave + - Laminar integration: openhands.sdk.observability.laminar +""" + +from __future__ import annotations + +import logging +import os +from collections.abc import Callable +from typing import Any, ParamSpec, TypeVar + +from openhands.sdk.observability.utils import get_env + + +logger = logging.getLogger(__name__) + +P = ParamSpec("P") +R = TypeVar("R") + +# Global state +_weave_initialized: bool = False +_weave_client: Any = None + + +def get_weave_client() -> Any: + """Get the current Weave client instance. + + Returns: + The Weave client if initialized, None otherwise. + """ + return _weave_client + + +def is_weave_initialized() -> bool: + """Check if Weave has been initialized. + + Returns: + True if Weave is initialized and ready for tracing. + """ + return _weave_initialized + + +def init_weave( + project: str | None = None, + api_key: str | None = None, + *, + settings: dict[str, Any] | None = None, +) -> bool: + """Initialize Weave for automatic tracing. + + This is the main entry point for enabling Weave observability. When called, + Weave automatically patches LiteLLM and other supported libraries, so all + LLM calls are traced without any manual decoration. + + Args: + project: The Weave project name (e.g., "my-team/my-project"). + If not provided, uses WEAVE_PROJECT environment variable. + api_key: The Weights & Biases API key. If not provided, uses + WANDB_API_KEY environment variable. + settings: Optional dict of Weave settings to configure behavior. + See Weave documentation for available settings. + + Returns: + True if initialization was successful, False otherwise. + + Raises: + ValueError: If no project is specified and WEAVE_PROJECT is not set. + + Example: + >>> from openhands.sdk.observability import init_weave + >>> init_weave("my-team/openhands-agent") + True + >>> # Now all LiteLLM calls are automatically traced! + """ + global _weave_initialized, _weave_client + + if _weave_initialized: + logger.debug("Weave already initialized, skipping") + return True + + try: + import weave + except ImportError: + logger.warning( + "Weave package not installed. Install with: pip install weave" + ) + return False + + # Determine project name + project_name = project or get_env("WEAVE_PROJECT") + if not project_name: + raise ValueError( + "Weave project must be specified via argument or WEAVE_PROJECT env var" + ) + + # Set API key in environment if provided (Weave reads from env) + wandb_api_key = api_key or get_env("WANDB_API_KEY") + if wandb_api_key: + os.environ["WANDB_API_KEY"] = wandb_api_key + + # Ensure wandb is logged in (required by weave.init) + try: + import wandb + wandb.login(key=wandb_api_key, relogin=False) + except Exception as e: + logger.warning(f"wandb login failed: {e}") + else: + logger.warning( + "WANDB_API_KEY not set. Weave tracing may not work correctly." + ) + + try: + # Initialize Weave - this automatically: + # 1. Patches all already-imported integrations (LiteLLM, OpenAI, etc.) + # 2. Registers import hooks for future imports + init_kwargs: dict[str, Any] = {} + if settings: + init_kwargs["settings"] = settings + + _weave_client = weave.init(project_name, **init_kwargs) + _weave_initialized = True + + logger.info( + f"Weave initialized for project: {project_name}. " + "All LiteLLM calls will be automatically traced." + ) + return True + except Exception as e: + logger.error(f"Failed to initialize Weave: {e}") + return False + + +def maybe_init_weave() -> bool: + """Initialize Weave if environment variables are configured. + + This is a convenience function that initializes Weave only if both + WANDB_API_KEY and WEAVE_PROJECT environment variables are set. + Useful for conditional initialization based on environment. + + Returns: + True if Weave was initialized (or already was), False otherwise. + + Example: + >>> import os + >>> os.environ["WANDB_API_KEY"] = "your-key" + >>> os.environ["WEAVE_PROJECT"] = "my-team/my-project" + >>> from openhands.sdk.observability import maybe_init_weave + >>> maybe_init_weave() # Initializes automatically + True + """ + if _weave_initialized: + return True + + if not should_enable_weave(): + logger.debug( + "Weave environment variables not set (WANDB_API_KEY, WEAVE_PROJECT). " + "Skipping Weave initialization." + ) + return False + + try: + return init_weave() + except ValueError: + return False + + +def should_enable_weave() -> bool: + """Check if Weave should be enabled based on environment variables. + + Returns: + True if both WANDB_API_KEY and WEAVE_PROJECT are set. + """ + return bool(get_env("WANDB_API_KEY") and get_env("WEAVE_PROJECT")) + + +def get_weave_op() -> Callable: + """Get the weave.op decorator for manual function tracing. + + Returns the actual weave.op decorator if Weave is initialized, + otherwise returns a no-op decorator that just returns the function. + + This is useful when you want to trace custom agent logic beyond + the automatic LLM call tracing. + + Returns: + The weave.op decorator or a no-op decorator. + + Example: + >>> from openhands.sdk.observability import init_weave, get_weave_op + >>> init_weave("my-project") + >>> weave_op = get_weave_op() + >>> + >>> @weave_op + ... def my_custom_function(x: int) -> int: + ... return x * 2 + """ + if not _weave_initialized: + def noop_decorator(func): + return func + return noop_decorator + + try: + import weave + return weave.op + except ImportError: + def noop_decorator(func): + return func + return noop_decorator + + +def weave_op( + func: Callable[P, R] | None = None, + *, + name: str | None = None, + call_display_name: str | Callable[..., str] | None = None, + postprocess_inputs: Callable[..., dict[str, Any]] | None = None, + postprocess_output: Callable[..., Any] | None = None, +) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]: + """Decorator to trace a function with Weave. + + This is a convenience wrapper around weave.op that handles the case + when Weave is not initialized (returns the function unchanged). + + Can be used with or without parentheses: + @weave_op + def my_func(): ... + + @weave_op(name="custom_name") + def my_func(): ... + + Args: + func: The function to decorate (when used without parentheses). + name: Optional name for the operation. Defaults to function name. + call_display_name: Display name for the call in the Weave UI. + Can be a string or a callable that takes the Call object. + postprocess_inputs: Function to transform inputs before logging. + postprocess_output: Function to transform output before logging. + + Returns: + The decorated function or a decorator. + + Example: + >>> @weave_op(name="agent_step") + ... def step(action: dict) -> dict: + ... return execute(action) + """ + def decorator(fn: Callable[P, R]) -> Callable[P, R]: + if not _weave_initialized: + return fn + + try: + import weave + + op_kwargs: dict[str, Any] = {} + if name: + op_kwargs["name"] = name + if call_display_name: + op_kwargs["call_display_name"] = call_display_name + if postprocess_inputs: + op_kwargs["postprocess_inputs"] = postprocess_inputs + if postprocess_output: + op_kwargs["postprocess_output"] = postprocess_output + + if op_kwargs: + return weave.op(**op_kwargs)(fn) + return weave.op(fn) + except Exception as e: + logger.warning(f"Failed to apply weave.op decorator: {e}") + return fn + + # Handle both @weave_op and @weave_op(...) syntax + if func is not None: + return decorator(func) + return decorator diff --git a/openhands-sdk/pyproject.toml b/openhands-sdk/pyproject.toml index 276295e37c..cd3fd1d573 100644 --- a/openhands-sdk/pyproject.toml +++ b/openhands-sdk/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ [project.optional-dependencies] boto3 = ["boto3>=1.35.0"] +weave = ["weave>=0.52.22", "wandb"] [build-system] requires = ["setuptools>=61.0", "wheel"] diff --git a/tests/sdk/observability/__init__.py b/tests/sdk/observability/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/sdk/observability/test_context.py b/tests/sdk/observability/test_context.py new file mode 100644 index 0000000000..5f369ed515 --- /dev/null +++ b/tests/sdk/observability/test_context.py @@ -0,0 +1,238 @@ +"""Tests for the generic observability context module.""" + +import pytest +from contextlib import nullcontext +from unittest.mock import MagicMock, patch + +from openhands.sdk.observability.context import ( + clear_conversation_context_providers, + get_conversation_context, + register_conversation_context_provider, + unregister_conversation_context_provider, + _conversation_context_providers, +) + + +class TestConversationContextProviderRegistry: + """Tests for the provider registry functions.""" + + def setup_method(self): + """Clear providers before each test.""" + # Store original providers + self._original_providers = _conversation_context_providers.copy() + clear_conversation_context_providers() + + def teardown_method(self): + """Restore original providers after each test.""" + clear_conversation_context_providers() + for provider in self._original_providers: + register_conversation_context_provider(provider) + + def test_register_provider(self): + """Test registering a new provider.""" + def my_provider(conversation_id: str): + return nullcontext() + + register_conversation_context_provider(my_provider) + assert my_provider in _conversation_context_providers + + def test_register_provider_no_duplicates(self): + """Test that registering the same provider twice doesn't create duplicates.""" + def my_provider(conversation_id: str): + return nullcontext() + + register_conversation_context_provider(my_provider) + register_conversation_context_provider(my_provider) + assert _conversation_context_providers.count(my_provider) == 1 + + def test_unregister_provider(self): + """Test unregistering a provider.""" + def my_provider(conversation_id: str): + return nullcontext() + + register_conversation_context_provider(my_provider) + assert my_provider in _conversation_context_providers + + unregister_conversation_context_provider(my_provider) + assert my_provider not in _conversation_context_providers + + def test_unregister_nonexistent_provider(self): + """Test unregistering a provider that was never registered.""" + def my_provider(conversation_id: str): + return nullcontext() + + # Should not raise + unregister_conversation_context_provider(my_provider) + + def test_clear_providers(self): + """Test clearing all providers.""" + def provider1(conversation_id: str): + return nullcontext() + + def provider2(conversation_id: str): + return nullcontext() + + register_conversation_context_provider(provider1) + register_conversation_context_provider(provider2) + assert len(_conversation_context_providers) == 2 + + clear_conversation_context_providers() + assert len(_conversation_context_providers) == 0 + + +class TestGetConversationContext: + """Tests for the get_conversation_context function.""" + + def setup_method(self): + """Clear providers before each test.""" + self._original_providers = _conversation_context_providers.copy() + clear_conversation_context_providers() + + def teardown_method(self): + """Restore original providers after each test.""" + clear_conversation_context_providers() + for provider in self._original_providers: + register_conversation_context_provider(provider) + + def test_no_providers_is_noop(self): + """Test that with no providers, the context is a no-op.""" + executed = False + + with get_conversation_context("test-conv"): + executed = True + + assert executed + + def test_single_provider_called(self): + """Test that a single provider is called with the conversation ID.""" + called_with = [] + + def my_provider(conversation_id: str): + called_with.append(conversation_id) + return nullcontext() + + register_conversation_context_provider(my_provider) + + with get_conversation_context("test-conv-123"): + pass + + assert called_with == ["test-conv-123"] + + def test_multiple_providers_called_in_order(self): + """Test that multiple providers are called in registration order.""" + call_order = [] + + def provider1(conversation_id: str): + call_order.append("provider1") + return nullcontext() + + def provider2(conversation_id: str): + call_order.append("provider2") + return nullcontext() + + register_conversation_context_provider(provider1) + register_conversation_context_provider(provider2) + + with get_conversation_context("test-conv"): + pass + + assert call_order == ["provider1", "provider2"] + + def test_provider_exception_does_not_break_others(self): + """Test that an exception in one provider doesn't prevent others.""" + call_order = [] + + def failing_provider(conversation_id: str): + raise RuntimeError("Provider failed") + + def working_provider(conversation_id: str): + call_order.append("working") + return nullcontext() + + register_conversation_context_provider(failing_provider) + register_conversation_context_provider(working_provider) + + # Should not raise + with get_conversation_context("test-conv"): + pass + + assert call_order == ["working"] + + def test_context_manager_enter_exit_called(self): + """Test that context manager __enter__ and __exit__ are called.""" + mock_cm = MagicMock() + mock_cm.__enter__ = MagicMock(return_value=None) + mock_cm.__exit__ = MagicMock(return_value=None) + + def my_provider(conversation_id: str): + return mock_cm + + register_conversation_context_provider(my_provider) + + with get_conversation_context("test-conv"): + mock_cm.__enter__.assert_called_once() + + mock_cm.__exit__.assert_called_once() + + +class TestBuiltInProviders: + """Tests for the built-in Weave and Laminar providers.""" + + def test_weave_provider_returns_nullcontext_when_not_initialized(self): + """Test that Weave provider returns nullcontext when Weave is not initialized.""" + from openhands.sdk.observability.context import _get_weave_conversation_context + + with patch( + "openhands.sdk.observability.weave.is_weave_initialized", + return_value=False, + ): + ctx = _get_weave_conversation_context("test-conv") + # nullcontext() returns a different instance each time, so check type name + assert type(ctx).__name__ == "nullcontext" + + def test_laminar_provider_returns_nullcontext_when_not_initialized(self): + """Test that Laminar provider returns nullcontext when Laminar is not initialized.""" + from openhands.sdk.observability.context import ( + _get_laminar_conversation_context, + ) + + with patch( + "openhands.sdk.observability.laminar.should_enable_observability", + return_value=False, + ): + ctx = _get_laminar_conversation_context("test-conv") + # nullcontext() returns a different instance each time, so check type name + assert type(ctx).__name__ == "nullcontext" + + +class TestIntegration: + """Integration tests for the observability context system.""" + + def test_providers_auto_registered_on_import(self): + """Test that built-in providers are registered when module is imported.""" + # Re-import to trigger registration + from openhands.sdk.observability import context + + # The module should have registered the built-in providers + # We check by looking for the provider functions + provider_names = [p.__name__ for p in context._conversation_context_providers] + assert "_get_weave_conversation_context" in provider_names + assert "_get_laminar_conversation_context" in provider_names + + def test_custom_provider_works_with_builtins(self): + """Test that custom providers work alongside built-in ones.""" + custom_called = [] + + def custom_provider(conversation_id: str): + custom_called.append(conversation_id) + return nullcontext() + + register_conversation_context_provider(custom_provider) + + try: + with get_conversation_context("test-conv"): + pass + + assert "test-conv" in custom_called + finally: + unregister_conversation_context_provider(custom_provider) diff --git a/tests/sdk/observability/test_weave.py b/tests/sdk/observability/test_weave.py new file mode 100644 index 0000000000..cad57dc767 --- /dev/null +++ b/tests/sdk/observability/test_weave.py @@ -0,0 +1,268 @@ +"""Tests for Weave observability integration. + +These tests verify the Weave integration works correctly, including: +- Automatic LLM tracing via Weave's autopatching +- Decorator functionality (with and without Weave initialized) +- Environment variable configuration +- Graceful fallback when Weave is not available +""" + +import os +from unittest.mock import MagicMock, patch + +import pytest + +# Check if weave is installed for tests that require it +try: + import weave + WEAVE_INSTALLED = True +except ImportError: + WEAVE_INSTALLED = False + +requires_weave = pytest.mark.skipif( + not WEAVE_INSTALLED, + reason="Weave package not installed" +) + + +class TestWeaveConfiguration: + """Tests for Weave configuration and initialization.""" + + def test_should_enable_weave_with_both_vars(self): + """should_enable_weave returns True when both env vars are set.""" + from openhands.sdk.observability.weave import should_enable_weave + + with patch.dict(os.environ, { + "WANDB_API_KEY": "test-key", + "WEAVE_PROJECT": "test-project", + }): + assert should_enable_weave() is True + + def test_should_enable_weave_missing_api_key(self): + """should_enable_weave returns False when API key is missing.""" + from openhands.sdk.observability.weave import should_enable_weave + + with patch.dict(os.environ, { + "WEAVE_PROJECT": "test-project", + }, clear=True): + # Clear WANDB_API_KEY if it exists + os.environ.pop("WANDB_API_KEY", None) + assert should_enable_weave() is False + + def test_should_enable_weave_missing_project(self): + """should_enable_weave returns False when project is missing.""" + from openhands.sdk.observability.weave import should_enable_weave + + with patch.dict(os.environ, { + "WANDB_API_KEY": "test-key", + }, clear=True): + os.environ.pop("WEAVE_PROJECT", None) + assert should_enable_weave() is False + + def test_is_weave_initialized_default(self): + """is_weave_initialized returns False by default.""" + # Reset global state + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import is_weave_initialized + assert is_weave_initialized() is False + + +class TestWeaveOpDecorator: + """Tests for the @weave_op decorator.""" + + def test_weave_op_without_initialization(self): + """@weave_op runs function normally when Weave is not initialized.""" + # Reset global state + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import weave_op + + @weave_op(name="test_function") + def test_function(x: int) -> int: + return x + 1 + + result = test_function(5) + assert result == 6 + + def test_weave_op_without_parentheses(self): + """@weave_op can be used without parentheses.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import weave_op + + @weave_op + def test_function(x: int) -> int: + return x + 1 + + result = test_function(5) + assert result == 6 + + def test_weave_op_handles_exceptions(self): + """@weave_op propagates exceptions correctly.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import weave_op + + @weave_op(name="failing_function") + def failing_function(): + raise ValueError("Test error") + + with pytest.raises(ValueError, match="Test error"): + failing_function() + + +class TestGetWeaveOp: + """Tests for the get_weave_op function.""" + + def test_get_weave_op_returns_noop_when_not_initialized(self): + """get_weave_op returns a no-op decorator when Weave is not initialized.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import get_weave_op + + op = get_weave_op() + + @op + def test_function(x: int) -> int: + return x * 2 + + # Function should work normally + assert test_function(5) == 10 + # Function should be unchanged + assert test_function.__name__ == "test_function" + + +class TestWeaveExports: + """Tests for module exports.""" + + def test_all_exports_available(self): + """All expected functions are exported from the module.""" + from openhands.sdk.observability import ( + get_weave_client, + get_weave_op, + init_weave, + is_weave_initialized, + maybe_init_weave, + should_enable_weave, + weave_op, + ) + + # Just verify they're callable + assert callable(get_weave_client) + assert callable(get_weave_op) + assert callable(init_weave) + assert callable(is_weave_initialized) + assert callable(maybe_init_weave) + assert callable(should_enable_weave) + assert callable(weave_op) + + +class TestInitWeave: + """Tests for init_weave function.""" + + @requires_weave + def test_init_weave_requires_project(self): + """init_weave raises ValueError when no project is specified.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import init_weave + + with patch.dict(os.environ, {}, clear=True): + os.environ.pop("WEAVE_PROJECT", None) + with pytest.raises(ValueError, match="Weave project must be specified"): + init_weave() + + def test_init_weave_returns_false_when_weave_not_installed(self): + """init_weave returns False when weave package is not installed.""" + # This test verifies the expected behavior. + # When weave is not installed, init_weave should return False. + # Since weave is an optional dependency, we can test the actual + # behavior directly if weave isn't installed. + if WEAVE_INSTALLED: + pytest.skip("Weave is installed, cannot test missing module behavior") + + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import init_weave + + result = init_weave(project="test-project") + # When weave is not installed, init_weave should return False + assert result is False + + @requires_weave + def test_init_weave_uses_env_project(self): + """init_weave uses WEAVE_PROJECT from environment.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import init_weave + + # Mock weave.init to avoid actual initialization + with patch("openhands.sdk.observability.weave.get_env") as mock_get_env: + mock_get_env.side_effect = lambda k: { + "WEAVE_PROJECT": "test-project", + "WANDB_API_KEY": None, + }.get(k) + + with patch("weave.init") as mock_weave_init: + mock_weave_init.return_value = MagicMock() + result = init_weave() + + # Should have called weave.init with the project + mock_weave_init.assert_called_once() + + def test_init_weave_already_initialized(self): + """init_weave returns True immediately if already initialized.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = True + + from openhands.sdk.observability.weave import init_weave + + result = init_weave(project="test") + assert result is True + + # Reset for other tests + weave_module._weave_initialized = False + + +class TestAutopatching: + """Tests for Weave's autopatching behavior. + + These tests verify that the integration is designed to leverage + Weave's automatic LiteLLM patching. + """ + + @requires_weave + def test_init_weave_calls_weave_init(self): + """init_weave calls weave.init which triggers autopatching.""" + import openhands.sdk.observability.weave as weave_module + weave_module._weave_initialized = False + + from openhands.sdk.observability.weave import init_weave + + with patch("openhands.sdk.observability.weave.get_env") as mock_get_env: + mock_get_env.side_effect = lambda k: { + "WEAVE_PROJECT": "test-project", + "WANDB_API_KEY": "test-key", + }.get(k) + + with patch("weave.init") as mock_weave_init: + with patch("wandb.login"): + mock_weave_init.return_value = MagicMock() + result = init_weave() + + # weave.init should be called, which triggers implicit_patch() + # and register_import_hook() internally + mock_weave_init.assert_called_once() + assert result is True + + # Reset for other tests + weave_module._weave_initialized = False