diff --git a/README.md b/README.md index 8bd1f79..fec602c 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,40 @@ async for event in agent.query_stream("do something"): print(f"Done: {text}") ``` +### Observability with Langfuse + +Optional tracing and observability with [Langfuse](https://langfuse.com): + +```bash +# Install with Langfuse support +uv add bu-agent-sdk[langfuse] + +# Set environment variables +export LANGFUSE_SECRET_KEY=sk-... +export LANGFUSE_PUBLIC_KEY=pk-... +``` + +```python +from bu_agent_sdk import observe_langfuse, is_langfuse_available, flush_langfuse + +@observe_langfuse(name="my_tool", as_type="span") +async def my_tool(query: str) -> str: + return await process(query) + +@observe_langfuse(name="llm_call", as_type="generation") +async def call_llm(prompt: str) -> str: + return await llm.generate(prompt) + +# Check if Langfuse is available +if is_langfuse_available(): + print("Langfuse tracing enabled") + +# Flush traces before exit +flush_langfuse() +``` + +Alternative: Use [Laminar](https://www.lmnr.ai/) with `uv add bu-agent-sdk[observability]` and the `@observe` decorator. + ## Claude Code in 100 Lines A sandboxed coding assistant with dependency injection: diff --git a/bu_agent_sdk/__init__.py b/bu_agent_sdk/__init__.py index c3be02d..c93a4b5 100644 --- a/bu_agent_sdk/__init__.py +++ b/bu_agent_sdk/__init__.py @@ -20,10 +20,24 @@ async def add(a: int, b: int) -> int: from bu_agent_sdk.agent import Agent from bu_agent_sdk.observability import Laminar, observe, observe_debug +from bu_agent_sdk.langfuse_observability import ( + LangfuseClient, + observe_langfuse, + observe_langfuse_debug, + is_langfuse_available, + flush_langfuse, +) __all__ = [ "Agent", + # Laminar observability "Laminar", "observe", "observe_debug", + # Langfuse observability + "LangfuseClient", + "observe_langfuse", + "observe_langfuse_debug", + "is_langfuse_available", + "flush_langfuse", ] diff --git a/bu_agent_sdk/langfuse_observability.py b/bu_agent_sdk/langfuse_observability.py new file mode 100644 index 0000000..ce8db3b --- /dev/null +++ b/bu_agent_sdk/langfuse_observability.py @@ -0,0 +1,322 @@ +""" +Observability module for bu_agent_sdk with optional Langfuse integration. + +This module provides: +- `observe_langfuse` decorator for tracing functions +- `observe_langfuse_debug` decorator that only traces in debug mode +- `LangfuseClient` wrapper for Langfuse initialization + +If langfuse is not installed, all decorators become no-ops. + +Usage: + from bu_agent_sdk.langfuse_observability import observe_langfuse, LangfuseClient + + @observe_langfuse(name="my_function") + async def my_function(): + ... + + # With async generators + @observe_langfuse(name="stream_response") + async def stream_response(): + for chunk in get_chunks(): + yield chunk + +Environment Variables: + LANGFUSE_SECRET_KEY: Your Langfuse secret key + LANGFUSE_PUBLIC_KEY: Your Langfuse public key + LANGFUSE_HOST: Optional Langfuse host URL (default: https://cloud.langfuse.com) +""" + +import logging +import os +from collections.abc import Callable +from functools import wraps +from typing import Any, Literal, TypeVar, cast + +logger = logging.getLogger(__name__) + +# Type definitions +F = TypeVar("F", bound=Callable[..., Any]) + +_LANGFUSE_AVAILABLE = False +_langfuse_observe = None +LangfuseClient = None + +# Try to import langfuse +try: + from langfuse import Langfuse as LangfuseClient # type: ignore + from langfuse import observe as _langfuse_observe # type: ignore + + _LANGFUSE_AVAILABLE = True + logger.debug("Langfuse is available for observability") +except ImportError: + logger.debug( + "Langfuse not installed - observability decorators will be no-ops" + ) + _LANGFUSE_AVAILABLE = False + + +def _is_debug_mode() -> bool: + """Check if we're in debug mode based on environment variables.""" + langfuse_debug_mode = os.getenv("LANGFUSE_DEBUG", "").lower() + if langfuse_debug_mode in ("1", "true", "yes", "debug"): + return True + # Also check BU_DEBUG for convenience + if os.getenv("BU_DEBUG", "").lower() in ("1", "true", "yes"): + return True + return False + + +def _create_no_op_decorator( + name: str | None = None, + capture_input: bool = True, + capture_output: bool = True, + as_type: Literal["span", "generation"] | None = None, + **kwargs: Any, +) -> Callable[[F], F]: + """Create a no-op decorator that accepts langfuse observe parameters but does nothing.""" + import asyncio + import inspect + + def decorator(func: F) -> F: + # Check for async generators first (async def with yield) + if inspect.isasyncgenfunction(func): + + @wraps(func) + async def async_gen_wrapper(*args, **kwargs): + async for item in func(*args, **kwargs): + yield item + + return cast(F, async_gen_wrapper) + elif asyncio.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + return await func(*args, **kwargs) + + return cast(F, async_wrapper) + else: + + @wraps(func) + def sync_wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return cast(F, sync_wrapper) + + return decorator + + +def _create_async_gen_observe_decorator( + name: str | None = None, + capture_input: bool = True, + capture_output: bool = True, + as_type: Literal["span", "generation"] | None = None, + **kwargs: Any, +) -> Callable[[F], F]: + """Create a decorator for async generators that wraps them in a Langfuse span.""" + + def decorator(func: F) -> F: + @wraps(func) + async def async_gen_wrapper(*args, **inner_kwargs): + span_name = name or func.__name__ + + # For async generators, we use context manager approach + if LangfuseClient is not None: + try: + from langfuse import get_client + langfuse = get_client() + + with langfuse.start_as_current_observation( + name=span_name, + as_type=as_type or "span", + input={"args": str(args)[:500], "kwargs": str(inner_kwargs)[:500]} if capture_input else None, + ) as span: + try: + async for item in func(*args, **inner_kwargs): + yield item + except Exception as e: + if capture_output: + span.update(output={"error": str(e)}) + raise + except Exception: + # If Langfuse context fails, just run the function + async for item in func(*args, **inner_kwargs): + yield item + else: + async for item in func(*args, **inner_kwargs): + yield item + + return cast(F, async_gen_wrapper) + + return decorator + + +def observe_langfuse( + name: str | None = None, + capture_input: bool = True, + capture_output: bool = True, + as_type: Literal["span", "generation"] | None = None, + **kwargs: Any, +) -> Callable[[F], F]: + """ + Observability decorator that traces function execution when langfuse is available. + + This decorator will use langfuse's observe decorator if langfuse is installed, + otherwise it will be a no-op. + + Args: + name: Name of the span/trace + capture_input: Whether to capture function input parameters in tracing + capture_output: Whether to capture function output in tracing + as_type: Type of observation ('span' or 'generation') + **kwargs: Additional parameters passed to langfuse observe + + Example: + @observe_langfuse(name="my_function") + async def my_function(param1, param2): + return param1 + param2 + + @observe_langfuse(name="llm_call", as_type="generation") + async def call_llm(prompt: str): + return await llm.generate(prompt) + """ + import inspect + + decorator_kwargs = { + "name": name, + "capture_input": capture_input, + "capture_output": capture_output, + "as_type": as_type, + **kwargs, + } + + def decorator(func: F) -> F: + # Async generators need special handling - use manual span wrapper + if inspect.isasyncgenfunction(func): + if _LANGFUSE_AVAILABLE and LangfuseClient is not None: + return _create_async_gen_observe_decorator(**decorator_kwargs)(func) + else: + return _create_no_op_decorator(**decorator_kwargs)(func) + + if _LANGFUSE_AVAILABLE and _langfuse_observe: + # Map our parameters to langfuse's observe decorator parameters + langfuse_kwargs = {} + if name: + langfuse_kwargs["name"] = name + if not capture_input: + langfuse_kwargs["capture_input"] = False + if not capture_output: + langfuse_kwargs["capture_output"] = False + if as_type: + langfuse_kwargs["as_type"] = as_type + # Pass any additional kwargs + langfuse_kwargs.update(kwargs) + + return cast(F, _langfuse_observe(**langfuse_kwargs)(func)) + else: + return _create_no_op_decorator(**decorator_kwargs)(func) + + return decorator + + +def observe_langfuse_debug( + name: str | None = None, + capture_input: bool = True, + capture_output: bool = True, + as_type: Literal["span", "generation"] | None = None, + **kwargs: Any, +) -> Callable[[F], F]: + """ + Debug-only observability decorator that only traces when in debug mode. + + This decorator will use langfuse's observe decorator if both langfuse is installed + AND we're in debug mode, otherwise it will be a no-op. + + Debug mode is enabled by: + - LANGFUSE_DEBUG=1/true/yes/debug + - BU_DEBUG=1/true/yes + + Args: + name: Name of the span/trace + capture_input: Whether to capture function input parameters in tracing + capture_output: Whether to capture function output in tracing + as_type: Type of observation ('span' or 'generation') + **kwargs: Additional parameters passed to langfuse observe + + Example: + @observe_langfuse_debug(name="debug_function") + async def debug_function(): + ... + """ + import inspect + + decorator_kwargs = { + "name": name, + "capture_input": capture_input, + "capture_output": capture_output, + "as_type": as_type, + **kwargs, + } + + def decorator(func: F) -> F: + # Async generators need special handling - use manual span wrapper + if inspect.isasyncgenfunction(func): + if _LANGFUSE_AVAILABLE and LangfuseClient is not None and _is_debug_mode(): + return _create_async_gen_observe_decorator(**decorator_kwargs)(func) + else: + return _create_no_op_decorator(**decorator_kwargs)(func) + + if _LANGFUSE_AVAILABLE and _langfuse_observe and _is_debug_mode(): + # Map our parameters to langfuse's observe decorator parameters + langfuse_kwargs = {} + if name: + langfuse_kwargs["name"] = name + if not capture_input: + langfuse_kwargs["capture_input"] = False + if not capture_output: + langfuse_kwargs["capture_output"] = False + if as_type: + langfuse_kwargs["as_type"] = as_type + langfuse_kwargs.update(kwargs) + + return cast(F, _langfuse_observe(**langfuse_kwargs)(func)) + else: + return _create_no_op_decorator(**decorator_kwargs)(func) + + return decorator + + +# Convenience functions +def is_langfuse_available() -> bool: + """Check if langfuse is available for tracing.""" + return _LANGFUSE_AVAILABLE + + +def is_debug_mode() -> bool: + """Check if we're currently in debug mode.""" + return _is_debug_mode() + + +def get_langfuse_status() -> dict[str, bool]: + """Get the current status of Langfuse observability features.""" + return { + "langfuse_available": _LANGFUSE_AVAILABLE, + "debug_mode": _is_debug_mode(), + "observe_active": _LANGFUSE_AVAILABLE, + "observe_debug_active": _LANGFUSE_AVAILABLE and _is_debug_mode(), + } + + +def flush_langfuse() -> None: + """Flush any pending Langfuse traces. + + Call this before your application exits to ensure all traces are sent. + """ + if _LANGFUSE_AVAILABLE and LangfuseClient is not None: + try: + from langfuse import get_client + client = get_client() + client.flush() + logger.debug("Langfuse traces flushed successfully") + except Exception as e: + logger.warning(f"Failed to flush Langfuse traces: {e}") diff --git a/pyproject.toml b/pyproject.toml index fb93e69..9588826 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ anthropic = ["anthropic>=0.40.0"] openai = ["openai>=1.50.0"] google = ["google-genai>=1.0.0"] observability = ["lmnr>=0.4.0"] +langfuse = ["langfuse>=2.0.0"] [project.urls] Homepage = "https://github.com/browser-use/bu-agent-sdk"