diff --git a/.bandit b/.bandit index 77e719e..41779ba 100644 --- a/.bandit +++ b/.bandit @@ -5,7 +5,10 @@ # Exclude directories exclude_dirs = ['.venv', '.pytest_cache', '__pycache__'] -# Skip B101 (assert_used) check for test files +# Exclude test files from scanning +exclude = ['/test_*.py', '*/test_*.py'] + +# Skip B101 (assert_used) check for test files (if not excluded) # Asserts are acceptable and expected in test files skips = B101 diff --git a/README.md b/README.md index 4899b42..7151ddf 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,13 @@ Export and analyze workflow trace data from LangSmith projects for performance i ## Overview -This toolkit provides two main capabilities: +This toolkit provides comprehensive capabilities for LangSmith trace analysis: 1. **Data Export** (`export_langsmith_traces.py`) - Export trace data from LangSmith using the SDK API -2. **Performance Analysis** (`analyze_traces.py`) - Analyze exported traces for latency, bottlenecks, and parallel execution +2. **Performance Analysis** (`analyze_traces.py`) - Analyze exported traces for latency, bottlenecks, and parallel execution (Phase 3A) +3. **Cost Analysis** (`analyze_cost.py`) - Calculate workflow costs with configurable pricing models (Phase 3B) +4. **Failure Pattern Analysis** (`analyze_failures.py`) - Detect failures, retry sequences, and error patterns (Phase 3C) -Designed for users on Individual Developer plans without bulk export features, with robust error handling, rate limiting, and comprehensive analysis capabilities. +Designed for users on Individual Developer plans without bulk export features, with robust error handling, rate limiting, and comprehensive analysis capabilities. All modules follow strict TDD methodology with 99+ tests and full type safety. ## Features @@ -241,17 +243,29 @@ Once you have exported trace data, use the analysis tools to gain performance in After generating analysis results, use the verification tool to ensure accuracy: ```bash -# Basic verification (regenerates all statistics) +# Basic verification - Phase 3A only (default) python verify_analysis_report.py traces_export.json +# Verify all phases (3A + 3B + 3C) +python verify_analysis_report.py traces_export.json --phases all + +# Verify specific phases +python verify_analysis_report.py traces_export.json --phases 3b +python verify_analysis_report.py traces_export.json --phases 3c +python verify_analysis_report.py traces_export.json --phases "3a,3b" + # Verify against expected values python verify_analysis_report.py traces_export.json --expected-values expected.json + +# Use custom pricing model for cost analysis +python verify_analysis_report.py traces_export.json --phases 3b --pricing-model gemini_1.5_pro ``` The verification tool: - Regenerates all calculations from raw data - Provides deterministic verification of findings - Optionally compares against expected values (PASS/FAIL indicators) +- Supports selective phase verification (3a, 3b, 3c, or all) - Useful for auditing and validating reports **Example expected values JSON:** @@ -270,6 +284,130 @@ The verification tool: } ``` +### Cost Analysis (Phase 3B) + +Analyze workflow costs based on token usage with configurable pricing models: + +```python +from analyze_cost import ( + analyze_costs, + PricingConfig, + EXAMPLE_PRICING_CONFIGS, +) +from analyze_traces import load_from_json + +# Load exported trace data +dataset = load_from_json("traces_export.json") + +# Option 1: Use example pricing config +pricing = EXAMPLE_PRICING_CONFIGS["gemini_1.5_pro"] + +# Option 2: Create custom pricing config +pricing = PricingConfig( + model_name="Custom Model", + input_tokens_per_1k=0.001, # $1.00 per 1M input tokens + output_tokens_per_1k=0.003, # $3.00 per 1M output tokens + cache_read_per_1k=0.0001, # $0.10 per 1M cache read tokens (optional) +) + +# Run cost analysis +results = analyze_costs( + workflows=dataset.workflows, + pricing_config=pricing, + scaling_factors=[1, 10, 100, 1000], # Optional, defaults to [1, 10, 100, 1000] + monthly_workflow_estimate=10000, # Optional, for monthly cost projections +) + +# Access results +print(f"Average cost per workflow: ${results.avg_cost_per_workflow:.4f}") +print(f"Median cost: ${results.median_cost_per_workflow:.4f}") +print(f"Top cost driver: {results.top_cost_driver}") + +# View node-level breakdown +for node in results.node_summaries[:3]: # Top 3 nodes + print(f" {node.node_name}:") + print(f" Total cost: ${node.total_cost:.4f}") + print(f" Executions: {node.execution_count}") + print(f" Avg per execution: ${node.avg_cost_per_execution:.6f}") + print(f" % of total: {node.percent_of_total_cost:.1f}%") + +# View scaling projections +for scale_label, projection in results.scaling_projections.items(): + print(f"{scale_label}: ${projection.total_cost:.2f} for {projection.workflow_count} workflows") + if projection.cost_per_month_30days: + print(f" Monthly estimate: ${projection.cost_per_month_30days:.2f}/month") +``` + +**Cost Analysis Features:** +- Configurable pricing for any LLM provider (not hard-coded) +- Token usage extraction (input/output/cache tokens) +- Workflow-level cost aggregation +- Node-level cost breakdown with percentages +- Scaling projections at 1x, 10x, 100x, 1000x volume +- Optional monthly cost estimates +- Data quality reporting for missing token data + +### Failure Pattern Analysis (Phase 3C) + +Detect and analyze failure patterns, retry sequences, and error distributions: + +```python +from analyze_failures import ( + analyze_failures, + FAILURE_STATUSES, + ERROR_PATTERNS, +) +from analyze_traces import load_from_json + +# Load exported trace data +dataset = load_from_json("traces_export.json") + +# Run failure analysis +results = analyze_failures(workflows=dataset.workflows) + +# Overall metrics +print(f"Total workflows: {results.total_workflows}") +print(f"Success rate: {results.overall_success_rate_percent:.1f}%") +print(f"Failed workflows: {results.failed_workflows}") + +# Node failure breakdown +print("\nTop 5 nodes by failure rate:") +for node in results.node_failure_stats[:5]: + print(f" {node.node_name}:") + print(f" Failure rate: {node.failure_rate_percent:.1f}%") + print(f" Failures: {node.failure_count}/{node.total_executions}") + print(f" Retry sequences: {node.retry_sequences_detected}") + print(f" Common errors: {node.common_error_types}") + +# Error distribution +print("\nError type distribution:") +for error_type, count in results.error_type_distribution.items(): + print(f" {error_type}: {count}") + +# Retry analysis +print(f"\nTotal retry sequences detected: {results.total_retry_sequences}") +if results.retry_success_rate_percent: + print(f"Retry success rate: {results.retry_success_rate_percent:.1f}%") + +# Example retry sequence details +for retry_seq in results.retry_sequences[:3]: # First 3 retry sequences + print(f"\nRetry sequence in {retry_seq.node_name}:") + print(f" Attempts: {retry_seq.attempt_count}") + print(f" Final status: {retry_seq.final_status}") + print(f" Total duration: {retry_seq.total_duration_seconds:.1f}s") +``` + +**Failure Analysis Features:** +- Status-based failure detection (error, failed, cancelled) +- Regex-based error classification (validation, timeout, import, LLM errors) +- Heuristic retry sequence detection: + - Multiple executions of same node within 5-minute window + - Ordered by start time +- Node-level failure statistics +- Retry success rate calculation +- Error distribution across workflows +- Quality risk identification (placeholder for future enhancement) + ### Using Python API Directly You can also use the analysis functions programmatically: @@ -420,9 +558,41 @@ pytest test_analyze_traces.py::TestCSVExport -v pytest --cov=analyze_traces test_analyze_traces.py ``` +**Cost analysis module tests (20 tests):** +```bash +# Run all cost analysis tests +pytest test_analyze_cost.py -v + +# Run specific test classes +pytest test_analyze_cost.py::TestPricingConfig -v +pytest test_analyze_cost.py::TestTokenExtraction -v +pytest test_analyze_cost.py::TestCostCalculation -v + +# Run with coverage +pytest --cov=analyze_cost test_analyze_cost.py +``` + +**Failure analysis module tests (15 tests):** +```bash +# Run all failure analysis tests +pytest test_analyze_failures.py -v + +# Run specific test classes +pytest test_analyze_failures.py::TestFailureDetection -v +pytest test_analyze_failures.py::TestRetryDetection -v +pytest test_analyze_failures.py::TestNodeFailureAnalysis -v + +# Run with coverage +pytest --cov=analyze_failures test_analyze_failures.py +``` + **Run all tests:** ```bash +# Run all 99 tests (33 export + 31 analysis + 20 cost + 15 failure) pytest -v + +# Run with coverage +pytest --cov=. -v ``` ### Project Structure @@ -435,11 +605,16 @@ export-langsmith-data/ ├── PLAN.md # PDCA implementation plan ├── export-langsmith-requirements.md # Export requirements specification ├── export_langsmith_traces.py # Data export script -├── test_export_langsmith_traces.py # Export test suite (42 tests) +├── test_export_langsmith_traces.py # Export test suite (33 tests) ├── validate_export.py # Export validation utility ├── test_validate_export.py # Validation test suite (7 tests) -├── analyze_traces.py # Performance analysis module +├── analyze_traces.py # Performance analysis module (Phase 3A) ├── test_analyze_traces.py # Analysis test suite (31 tests) +├── analyze_cost.py # Cost analysis module (Phase 3B) +├── test_analyze_cost.py # Cost analysis test suite (20 tests) +├── analyze_failures.py # Failure pattern analysis module (Phase 3C) +├── test_analyze_failures.py # Failure analysis test suite (15 tests) +├── verify_analysis_report.py # Verification tool for all phases ├── notebooks/ │ └── langsmith_trace_performance_analysis.ipynb # Interactive analysis notebook ├── output/ # Generated CSV analysis results @@ -490,11 +665,33 @@ This project follows the **PDCA (Plan-Do-Check-Act) framework** with strict Test - ✅ Code quality: Black, Ruff, mypy checks passing - ✅ TDD methodology: Strict RED-GREEN-REFACTOR cycles across all 5 phases +### ✅ Complete - Production Ready (Continued) + +**Cost Analysis Module (Phase 3B):** +- ✅ Configurable pricing models for any LLM provider +- ✅ Token usage extraction from trace metadata +- ✅ Cost calculation with input/output/cache token pricing +- ✅ Workflow-level cost aggregation +- ✅ Node-level cost breakdown with percentages +- ✅ Scaling projections (1x, 10x, 100x, 1000x) +- ✅ Test suite: 20 tests, full coverage +- ✅ Code quality: Black, Ruff, mypy, Bandit checks passing + +**Failure Pattern Analysis Module (Phase 3C):** +- ✅ Status-based failure detection +- ✅ Regex-based error classification (5 patterns + unknown) +- ✅ Heuristic retry sequence detection +- ✅ Node-level failure statistics +- ✅ Retry success rate calculation +- ✅ Error distribution tracking +- ✅ Test suite: 15 tests, full coverage +- ✅ Code quality: Black, Ruff, mypy, Bandit checks passing + ### Optional Features Not Implemented - ⏸️ Progress indication (tqdm) - Skipped in favor of simple console output -- ⏸️ Cost analysis (Phase 3B) - Future enhancement for token usage tracking -- ⏸️ Failure analysis (Phase 3C) - Future enhancement for error pattern detection +- ⏸️ Validator effectiveness analysis - Placeholder in Phase 3C for future enhancement +- ⏸️ Cache effectiveness analysis - Placeholder in Phase 3B for future enhancement ## Troubleshooting diff --git a/analyze_cost.py b/analyze_cost.py new file mode 100644 index 0000000..346c8a3 --- /dev/null +++ b/analyze_cost.py @@ -0,0 +1,718 @@ +""" +LangSmith Trace Cost Analysis Tool - Phase 3B + +This module provides cost analysis capabilities for LangSmith trace exports. +Calculates costs based on token usage with configurable pricing models. + +Following PDCA (Plan-Do-Check-Act) methodology with TDD approach. + +Author: Generated with Claude Code (PDCA Framework) +Date: 2025-12-09 +""" + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional +from analyze_traces import Trace, Workflow + + +# ============================================================================ +# Pricing Configuration +# ============================================================================ + + +@dataclass +class PricingConfig: + """Configurable pricing model for any LLM provider.""" + + model_name: str + input_tokens_per_1k: float # Cost per 1K input tokens + output_tokens_per_1k: float # Cost per 1K output tokens + cache_read_per_1k: Optional[float] = ( + None # Cost per 1K cache read tokens (if applicable) + ) + + def __post_init__(self) -> None: + """Validate pricing configuration.""" + if self.input_tokens_per_1k < 0 or self.output_tokens_per_1k < 0: + raise ValueError("Token prices must be non-negative") + if self.cache_read_per_1k is not None and self.cache_read_per_1k < 0: + raise ValueError("Cache read price must be non-negative") + + +# Example pricing configs for reference (NOT hard-coded defaults) +# Users should create their own PricingConfig instances +EXAMPLE_PRICING_CONFIGS = { + "gemini_1.5_pro": PricingConfig( + model_name="Gemini 1.5 Pro", + input_tokens_per_1k=0.00125, # $1.25 per 1M input tokens + output_tokens_per_1k=0.005, # $5.00 per 1M output tokens + cache_read_per_1k=0.0003125, # $0.3125 per 1M cache read tokens + ), +} + +SCALING_FACTORS = [1, 10, 100, 1000] # Current, 10x, 100x, 1000x + + +# ============================================================================ +# Core Data Structures +# ============================================================================ + + +@dataclass +class TokenUsage: + """Token usage for a single trace.""" + + input_tokens: int + output_tokens: int + total_tokens: int + cached_tokens: Optional[int] = None # From input_token_details.cache_read + + def has_cache_data(self) -> bool: + """Check if cache token data is available.""" + return self.cached_tokens is not None + + +@dataclass +class CostBreakdown: + """Cost breakdown for a single trace.""" + + trace_id: str + trace_name: str + input_cost: float + output_cost: float + cache_cost: float + total_cost: float + token_usage: TokenUsage + + def to_dict(self) -> Dict[str, Any]: + """Export to dictionary for CSV/reporting.""" + return { + "trace_id": self.trace_id, + "trace_name": self.trace_name, + "input_tokens": self.token_usage.input_tokens, + "output_tokens": self.token_usage.output_tokens, + "total_tokens": self.token_usage.total_tokens, + "input_cost": self.input_cost, + "output_cost": self.output_cost, + "cache_cost": self.cache_cost, + "total_cost": self.total_cost, + } + + +@dataclass +class WorkflowCostAnalysis: + """Cost analysis for a single workflow.""" + + workflow_id: str + total_cost: float + node_costs: List[CostBreakdown] + total_tokens: int + cache_effectiveness_percent: Optional[float] = None # If cache data available + + +@dataclass +class ScalingProjection: + """Cost projection at a specific scale factor.""" + + scale_factor: int + workflow_count: int + total_cost: float + cost_per_month_30days: Optional[float] = None # If monthly estimate provided + + +@dataclass +class NodeCostSummary: + """Cost summary for a node type across workflows.""" + + node_name: str + execution_count: int + total_cost: float + avg_cost_per_execution: float + percent_of_total_cost: float + + +@dataclass +class CacheCostComparison: + """Comparison of costs with cache vs without cache.""" + + cost_with_cache: float # Actual cost using cache + cost_without_cache: float # Hypothetical cost if no cache used + total_savings: float # Dollar savings from using cache + savings_percent: float # Percentage savings (0-100) + traces_analyzed: int # Total number of traces analyzed + traces_with_cache: int # Number of traces that used cache + + +@dataclass +class CostAnalysisResults: + """Complete cost analysis results.""" + + # Per-workflow metrics + avg_cost_per_workflow: float + median_cost_per_workflow: float + min_cost: float + max_cost: float + + # Node-level breakdown + node_summaries: List[NodeCostSummary] # Sorted by cost descending + top_cost_driver: Optional[str] + + # Cache effectiveness + cache_effectiveness_percent: Optional[float] + cache_savings_dollars: Optional[float] + + # Scaling projections + scaling_projections: Dict[str, ScalingProjection] # "10x", "100x", etc. + + # Metadata + total_workflows_analyzed: int + data_quality_notes: List[str] + + +# ============================================================================ +# Token Extraction Functions +# ============================================================================ + + +def extract_token_usage(trace: Trace) -> Optional[TokenUsage]: + """ + Extract token usage from trace. + + Checks multiple possible locations: + 1. Top-level trace fields (total_tokens, prompt_tokens, completion_tokens) + 2. trace.outputs["usage_metadata"] + 3. trace.inputs["usage_metadata"] (fallback) + 4. Extracts cache_read from input_token_details if available + + Args: + trace: Trace object to extract from + + Returns: + TokenUsage if data found, None otherwise + """ + # Try top-level token fields first (from updated export) + if hasattr(trace, "total_tokens") and trace.total_tokens is not None: + # LangSmith Run objects have: total_tokens, prompt_tokens, completion_tokens + input_tokens = getattr(trace, "prompt_tokens", 0) or 0 + output_tokens = getattr(trace, "completion_tokens", 0) or 0 + total_tokens = trace.total_tokens + + # Extract cache tokens from LangChain message format before returning + cached_tokens = None + if trace.outputs is not None and isinstance(trace.outputs, dict): + if "generations" in trace.outputs: + generations = trace.outputs.get("generations", [[]]) + if generations and len(generations) > 0 and len(generations[0]) > 0: + message = generations[0][0] + if isinstance(message, dict): + message_obj = message.get("message", {}) + if isinstance(message_obj, dict): + kwargs = message_obj.get("kwargs", {}) + if isinstance(kwargs, dict): + usage_metadata = kwargs.get("usage_metadata", {}) + if isinstance(usage_metadata, dict): + input_token_details = usage_metadata.get( + "input_token_details", {} + ) + if isinstance(input_token_details, dict): + cached_tokens = input_token_details.get( + "cache_read" + ) + + return TokenUsage( + input_tokens=input_tokens, + output_tokens=output_tokens, + total_tokens=total_tokens, + cached_tokens=cached_tokens, + ) + + # Try outputs (handle None outputs) + usage_data = None + cached_tokens = None + + if trace.outputs is not None: + # First try LangChain message format (where cache data lives) + # Path: outputs.generations[0][0].message.kwargs.usage_metadata + if "generations" in trace.outputs: + generations = trace.outputs.get("generations", [[]]) + if generations and len(generations) > 0 and len(generations[0]) > 0: + message = generations[0][0] + if isinstance(message, dict): + message_obj = message.get("message", {}) + if isinstance(message_obj, dict): + kwargs = message_obj.get("kwargs", {}) + if isinstance(kwargs, dict): + usage_metadata = kwargs.get("usage_metadata", {}) + if isinstance(usage_metadata, dict) and usage_metadata: + usage_data = usage_metadata + + # Fallback to direct usage_metadata + if not usage_data: + usage_data = trace.outputs.get("usage_metadata") + + # Fallback to inputs (handle None inputs) + if not usage_data and trace.inputs is not None: + usage_data = trace.inputs.get("usage_metadata") + + if not usage_data: + return None + + # Safely extract with defaults + input_tokens = usage_data.get("input_tokens", 0) + output_tokens = usage_data.get("output_tokens", 0) + total_tokens = usage_data.get("total_tokens", input_tokens + output_tokens) + + # Extract cache tokens if available + if "input_token_details" in usage_data: + token_details = usage_data["input_token_details"] + if isinstance(token_details, dict): + cached_tokens = token_details.get("cache_read") + + return TokenUsage( + input_tokens=input_tokens, + output_tokens=output_tokens, + total_tokens=total_tokens, + cached_tokens=cached_tokens, + ) + + +# ============================================================================ +# Cost Calculation Functions +# ============================================================================ + + +def calculate_trace_cost( + token_usage: TokenUsage, + pricing_config: PricingConfig, + trace_id: str = "", + trace_name: str = "", +) -> CostBreakdown: + """ + Calculate cost for single trace using pricing model. + + Args: + token_usage: Token usage data + pricing_config: Pricing configuration + trace_id: Optional trace ID for breakdown + trace_name: Optional trace name for breakdown + + Returns: + CostBreakdown with detailed cost information + """ + # Calculate input cost: (tokens / 1000) * price_per_1k + input_cost = ( + token_usage.input_tokens / 1000.0 + ) * pricing_config.input_tokens_per_1k + + # Calculate output cost + output_cost = ( + token_usage.output_tokens / 1000.0 + ) * pricing_config.output_tokens_per_1k + + # Calculate cache cost if applicable + cache_cost = 0.0 + if ( + token_usage.cached_tokens is not None + and pricing_config.cache_read_per_1k is not None + ): + cache_cost = ( + token_usage.cached_tokens / 1000.0 + ) * pricing_config.cache_read_per_1k + + total_cost = input_cost + output_cost + cache_cost + + return CostBreakdown( + trace_id=trace_id, + trace_name=trace_name, + input_cost=input_cost, + output_cost=output_cost, + cache_cost=cache_cost, + total_cost=total_cost, + token_usage=token_usage, + ) + + +def calculate_workflow_cost( + workflow: Workflow, + pricing_config: PricingConfig, +) -> WorkflowCostAnalysis: + """ + Calculate total cost and breakdown by node for a workflow. + + Args: + workflow: Workflow to analyze + pricing_config: Pricing configuration + + Returns: + WorkflowCostAnalysis with cost breakdown + """ + node_costs = [] + total_cost = 0.0 + total_tokens = 0 + + # Process all traces in workflow + for trace in workflow.all_traces: + token_usage = extract_token_usage(trace) + if token_usage: + cost_breakdown = calculate_trace_cost( + token_usage, + pricing_config, + trace_id=trace.id, + trace_name=trace.name, + ) + node_costs.append(cost_breakdown) + total_cost += cost_breakdown.total_cost + total_tokens += token_usage.total_tokens + + return WorkflowCostAnalysis( + workflow_id=workflow.root_trace.id, + total_cost=total_cost, + node_costs=node_costs, + total_tokens=total_tokens, + ) + + +# ============================================================================ +# Scaling Projection Functions +# ============================================================================ + + +def project_scaling_costs( + avg_cost_per_workflow: float, + current_workflow_count: int, + scaling_factors: List[int], + monthly_workflow_estimate: Optional[int] = None, +) -> Dict[str, ScalingProjection]: + """ + Project costs at different scale factors (1x, 10x, 100x, 1000x). + + Args: + avg_cost_per_workflow: Average cost per workflow in dollars + current_workflow_count: Current number of workflows in dataset + scaling_factors: List of scale factors (e.g., [1, 10, 100, 1000]) + monthly_workflow_estimate: Optional monthly workflow estimate for monthly cost + + Returns: + Dict mapping scale labels ("1x", "10x", etc.) to ScalingProjection objects + """ + projections = {} + + for factor in scaling_factors: + scaled_workflow_count = current_workflow_count * factor + total_cost = avg_cost_per_workflow * scaled_workflow_count + + # Calculate monthly cost if estimate provided + cost_per_month = None + if monthly_workflow_estimate is not None: + monthly_workflows_at_scale = monthly_workflow_estimate * factor + cost_per_month = avg_cost_per_workflow * monthly_workflows_at_scale + + projection = ScalingProjection( + scale_factor=factor, + workflow_count=scaled_workflow_count, + total_cost=total_cost, + cost_per_month_30days=cost_per_month, + ) + + # Create label (1x, 10x, 100x, etc.) + label = f"{factor}x" + projections[label] = projection + + return projections + + +# ============================================================================ +# Aggregation and Analysis Functions +# ============================================================================ + + +def aggregate_node_costs( + workflow_analyses: List[WorkflowCostAnalysis], +) -> List[NodeCostSummary]: + """ + Aggregate costs by node type across all workflows. + + Args: + workflow_analyses: List of WorkflowCostAnalysis objects + + Returns: + List of NodeCostSummary objects sorted by total_cost descending + """ + if not workflow_analyses: + return [] + + # Aggregate by node name + node_data: Dict[str, Dict[str, Any]] = {} + total_overall_cost = 0.0 + + for workflow_analysis in workflow_analyses: + for cost_breakdown in workflow_analysis.node_costs: + node_name = cost_breakdown.trace_name + if node_name not in node_data: + node_data[node_name] = { + "execution_count": 0, + "total_cost": 0.0, + } + node_data[node_name]["execution_count"] += 1 + node_data[node_name]["total_cost"] += cost_breakdown.total_cost + total_overall_cost += cost_breakdown.total_cost + + # Create summaries with percentages + summaries = [] + for node_name, data in node_data.items(): + execution_count = data["execution_count"] + total_cost = data["total_cost"] + avg_cost = total_cost / execution_count if execution_count > 0 else 0.0 + percent_of_total = ( + (total_cost / total_overall_cost * 100.0) if total_overall_cost > 0 else 0.0 + ) + + summary = NodeCostSummary( + node_name=node_name, + execution_count=execution_count, + total_cost=total_cost, + avg_cost_per_execution=avg_cost, + percent_of_total_cost=percent_of_total, + ) + summaries.append(summary) + + # Sort by total cost descending + summaries.sort(key=lambda s: s.total_cost, reverse=True) + + return summaries + + +# ============================================================================ +# Cache Effectiveness Functions +# ============================================================================ + + +def calculate_cache_hit_rate(workflow_analyses: List[WorkflowCostAnalysis]) -> float: + """ + Calculate percentage of traces that used cache. + + Args: + workflow_analyses: List of WorkflowCostAnalysis objects + + Returns: + Percentage of traces with cache data (0-100) + """ + if not workflow_analyses: + return 0.0 + + total_traces = 0 + cached_traces = 0 + + for workflow_analysis in workflow_analyses: + for cost_breakdown in workflow_analysis.node_costs: + total_traces += 1 + if cost_breakdown.token_usage.has_cache_data(): + cached_traces += 1 + + if total_traces == 0: + return 0.0 + + return (cached_traces / total_traces) * 100.0 + + +def calculate_cache_savings( + workflow_analyses: List[WorkflowCostAnalysis], + pricing_config: PricingConfig, +) -> float: + """ + Calculate total cost savings from cache usage. + + Compares actual cache costs to what costs would have been if cache tokens + were charged at input token rate. + + Args: + workflow_analyses: List of WorkflowCostAnalysis objects + pricing_config: Pricing configuration + + Returns: + Total savings in dollars from using cache + """ + if not workflow_analyses: + return 0.0 + + if pricing_config.cache_read_per_1k is None: + return 0.0 # Cannot calculate savings without cache pricing + + total_savings = 0.0 + + for workflow_analysis in workflow_analyses: + for cost_breakdown in workflow_analysis.node_costs: + if cost_breakdown.token_usage.has_cache_data(): + cached_tokens = cost_breakdown.token_usage.cached_tokens + if cached_tokens is None: + continue + + # Cost if these tokens were charged at input rate + cost_without_cache = ( + cached_tokens / 1000.0 + ) * pricing_config.input_tokens_per_1k + + # Actual cost at cache rate + cost_with_cache = ( + cached_tokens / 1000.0 + ) * pricing_config.cache_read_per_1k + + # Savings is the difference + savings = cost_without_cache - cost_with_cache + total_savings += savings + + return total_savings + + +def compare_cached_vs_fresh_costs( + workflow_analyses: List[WorkflowCostAnalysis], + pricing_config: PricingConfig, +) -> CacheCostComparison: + """ + Compare total costs with cache vs hypothetical costs without cache. + + Provides detailed breakdown showing actual cost using cache vs what cost + would have been if cache tokens were charged at input token rate. + + Args: + workflow_analyses: List of WorkflowCostAnalysis objects + pricing_config: Pricing configuration + + Returns: + CacheCostComparison with detailed cost comparison + """ + cost_with_cache = 0.0 + cost_without_cache = 0.0 + traces_analyzed = 0 + traces_with_cache = 0 + + for workflow_analysis in workflow_analyses: + for cost_breakdown in workflow_analysis.node_costs: + traces_analyzed += 1 + + # Add actual cost (with cache) + cost_with_cache += cost_breakdown.total_cost + + # Calculate hypothetical cost without cache + if cost_breakdown.token_usage.has_cache_data(): + traces_with_cache += 1 + + # If cache pricing available, calculate what cost would have been + if pricing_config.cache_read_per_1k is not None: + cached_tokens = cost_breakdown.token_usage.cached_tokens + if cached_tokens is None: + cost_without_cache += cost_breakdown.total_cost + continue + + # Remove actual cache cost + cost_without_this_cache = ( + cost_breakdown.total_cost - cost_breakdown.cache_cost + ) + + # Add what it would cost at input token rate + hypothetical_input_cost = ( + cached_tokens / 1000.0 + ) * pricing_config.input_tokens_per_1k + + cost_without_cache += ( + cost_without_this_cache + hypothetical_input_cost + ) + else: + # No cache pricing, so cost would be same + cost_without_cache += cost_breakdown.total_cost + else: + # No cache data, cost is same with or without cache + cost_without_cache += cost_breakdown.total_cost + + # Calculate savings + total_savings = cost_without_cache - cost_with_cache + + # Calculate savings percentage + if cost_without_cache > 0: + savings_percent = (total_savings / cost_without_cache) * 100.0 + else: + savings_percent = 0.0 + + return CacheCostComparison( + cost_with_cache=cost_with_cache, + cost_without_cache=cost_without_cache, + total_savings=total_savings, + savings_percent=savings_percent, + traces_analyzed=traces_analyzed, + traces_with_cache=traces_with_cache, + ) + + +def analyze_costs( + workflows: List[Workflow], + pricing_config: PricingConfig, + scaling_factors: Optional[List[int]] = None, + monthly_workflow_estimate: Optional[int] = None, +) -> CostAnalysisResults: + """ + Perform complete cost analysis on workflows. + Main entry point for Phase 3B. + + Args: + workflows: List of Workflow objects to analyze + pricing_config: Pricing configuration for cost calculation + scaling_factors: Optional list of scale factors (default: [1, 10, 100, 1000]) + monthly_workflow_estimate: Optional monthly workflow estimate for projections + + Returns: + CostAnalysisResults with complete analysis + """ + if scaling_factors is None: + scaling_factors = SCALING_FACTORS + + data_quality_notes = [] + + # Calculate cost for each workflow + workflow_analyses = [] + workflow_costs = [] + + for workflow in workflows: + analysis = calculate_workflow_cost(workflow, pricing_config) + workflow_analyses.append(analysis) + workflow_costs.append(analysis.total_cost) + + # Calculate per-workflow statistics + if workflow_costs: + avg_cost = sum(workflow_costs) / len(workflow_costs) + sorted_costs = sorted(workflow_costs) + median_cost = sorted_costs[len(sorted_costs) // 2] + min_cost = min(workflow_costs) + max_cost = max(workflow_costs) + else: + avg_cost = median_cost = min_cost = max_cost = 0.0 + data_quality_notes.append("No workflows with cost data found") + + # Aggregate node costs + node_summaries = aggregate_node_costs(workflow_analyses) + top_cost_driver = node_summaries[0].node_name if node_summaries else None + + # Calculate scaling projections + scaling_projections = project_scaling_costs( + avg_cost_per_workflow=avg_cost, + current_workflow_count=len(workflows), + scaling_factors=scaling_factors, + monthly_workflow_estimate=monthly_workflow_estimate, + ) + + # Cache effectiveness analysis + cache_effectiveness_percent = calculate_cache_hit_rate(workflow_analyses) + cache_savings_dollars = calculate_cache_savings(workflow_analyses, pricing_config) + + return CostAnalysisResults( + avg_cost_per_workflow=avg_cost, + median_cost_per_workflow=median_cost, + min_cost=min_cost, + max_cost=max_cost, + node_summaries=node_summaries, + top_cost_driver=top_cost_driver, + cache_effectiveness_percent=cache_effectiveness_percent, + cache_savings_dollars=cache_savings_dollars, + scaling_projections=scaling_projections, + total_workflows_analyzed=len(workflows), + data_quality_notes=data_quality_notes, + ) diff --git a/analyze_failures.py b/analyze_failures.py new file mode 100644 index 0000000..353328e --- /dev/null +++ b/analyze_failures.py @@ -0,0 +1,455 @@ +""" +LangSmith Trace Failure Pattern Analysis Tool - Phase 3C + +This module provides failure pattern analysis capabilities for LangSmith trace exports. +Detects failures, analyzes retry sequences, and assesses quality risks. + +Following PDCA (Plan-Do-Check-Act) methodology with TDD approach. + +Author: Generated with Claude Code (PDCA Framework) +Date: 2025-12-09 +""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, List, Optional +import re +from analyze_traces import Trace, Workflow + + +# ============================================================================ +# Configuration Constants +# ============================================================================ + +# Status values indicating failure +FAILURE_STATUSES = {"error", "failed", "cancelled"} +SUCCESS_STATUSES = {"success"} + +# Retry detection heuristics +RETRY_DETECTION_CONFIG = { + "max_time_window_seconds": 300, # 5 min window for retry detection + "same_node_threshold": 2, # 2+ executions = potential retry +} + +# Error classification patterns (regex) +ERROR_PATTERNS = { + "validation_failure": r"validation.*fail|invalid.*spec", + "api_timeout": r"timeout|timed out", + "import_error": r"import.*fail|import.*error", + "llm_error": r"model.*error|generation.*fail|token.*limit", + "unknown": r".*", # Catch-all +} + + +# ============================================================================ +# Core Data Structures +# ============================================================================ + + +@dataclass +class FailureInstance: + """Single failure occurrence.""" + + trace_id: str + trace_name: str + workflow_id: str + error_message: Optional[str] + error_type: str # Classified from ERROR_PATTERNS + timestamp: Optional[datetime] + + +@dataclass +class RetrySequence: + """Detected retry sequence.""" + + node_name: str + workflow_id: str + attempt_count: int + attempts: List[Trace] # Ordered by start_time + final_status: str # 'success' or 'failed' + total_duration_seconds: float + total_cost_estimate: Optional[float] = None + + +@dataclass +class NodeFailureStats: + """Failure statistics for a node type.""" + + node_name: str + total_executions: int + failure_count: int + success_count: int + failure_rate_percent: float + retry_sequences_detected: int + avg_retries_when_failing: float + common_error_types: Dict[str, int] # error_type -> count + + +@dataclass +class ValidatorEffectivenessAnalysis: + """Validator effectiveness assessment.""" + + validator_name: str + total_executions: int + caught_issues_count: int # Failures detected + pass_rate_percent: float + is_necessary: bool # Based on redundancy analysis + + +@dataclass +class FailureAnalysisResults: + """Complete failure pattern analysis results.""" + + # Overall metrics + total_workflows: int + successful_workflows: int + failed_workflows: int + overall_success_rate_percent: float + + # Node-level breakdown + node_failure_stats: List[NodeFailureStats] # Sorted by failure_rate + highest_failure_node: Optional[str] + + # Error distribution + error_type_distribution: Dict[str, int] + most_common_error_type: Optional[str] + + # Retry analysis + total_retry_sequences: int + retry_sequences: List[RetrySequence] + retry_success_rate_percent: Optional[float] + avg_cost_of_retries: Optional[float] + + # Validator analysis + validator_analyses: List[ValidatorEffectivenessAnalysis] + redundant_validators: List[str] + + # Quality risks + quality_risks_at_scale: List[str] + + +# ============================================================================ +# Failure Detection Functions +# ============================================================================ + + +def detect_failures(workflow: Workflow) -> List[FailureInstance]: + """ + Detect all failures in workflow using trace.status and trace.error. + + Args: + workflow: Workflow to analyze + + Returns: + List of FailureInstance objects + """ + failures = [] + + for trace in workflow.all_traces: + if trace.status in FAILURE_STATUSES: + error_type = classify_error(trace.error) + failure = FailureInstance( + trace_id=trace.id, + trace_name=trace.name, + workflow_id=workflow.root_trace.id, + error_message=trace.error, + error_type=error_type, + timestamp=trace.start_time, + ) + failures.append(failure) + + return failures + + +def classify_error(error_message: Optional[str]) -> str: + """ + Classify error into type using regex patterns. + + Args: + error_message: Error message to classify + + Returns: + Error type string + """ + if not error_message: + return "unknown" + + error_lower = error_message.lower() + + # Try each pattern (order matters - more specific first) + for error_type, pattern in ERROR_PATTERNS.items(): + if error_type == "unknown": + continue # Skip catch-all for now + if re.search(pattern, error_lower): + return error_type + + return "unknown" + + +# ============================================================================ +# Retry Detection Functions +# ============================================================================ + + +def detect_retry_sequences(workflow: Workflow) -> List[RetrySequence]: + """ + Detect retry sequences using heuristics: + - Multiple executions of same node within time window + - Ordered by start_time + + Args: + workflow: Workflow to analyze + + Returns: + List of RetrySequence objects + """ + # Group traces by node name + node_traces: Dict[str, List[Trace]] = {} + for trace in workflow.all_traces: + if trace.name not in node_traces: + node_traces[trace.name] = [] + node_traces[trace.name].append(trace) + + retry_sequences = [] + + for node_name, traces in node_traces.items(): + if len(traces) < RETRY_DETECTION_CONFIG["same_node_threshold"]: + continue + + # Filter out traces with None start_time and sort by start_time + valid_traces = [t for t in traces if t.start_time is not None] + if len(valid_traces) < RETRY_DETECTION_CONFIG["same_node_threshold"]: + continue + + sorted_traces = sorted(valid_traces, key=lambda t: t.start_time) # type: ignore[arg-type, return-value] + + # Check if traces are within time window + first_start = sorted_traces[0].start_time + last_start = sorted_traces[-1].start_time + if first_start is None or last_start is None: + continue + time_diff = (last_start - first_start).total_seconds() + + if time_diff <= RETRY_DETECTION_CONFIG["max_time_window_seconds"]: + # This looks like a retry sequence + final_status = sorted_traces[-1].status + total_duration = sum(t.duration_seconds for t in sorted_traces) + + retry_seq = RetrySequence( + node_name=node_name, + workflow_id=workflow.root_trace.id, + attempt_count=len(sorted_traces), + attempts=sorted_traces, + final_status=final_status, + total_duration_seconds=total_duration, + ) + retry_sequences.append(retry_seq) + + return retry_sequences + + +def calculate_retry_success_rate( + retry_sequences: List[RetrySequence], +) -> Optional[float]: + """ + Calculate % of retries that eventually succeed. + + Args: + retry_sequences: List of RetrySequence objects + + Returns: + Success rate as percentage, or None if no retries + """ + if not retry_sequences: + return None + + successful_retries = sum( + 1 for seq in retry_sequences if seq.final_status in SUCCESS_STATUSES + ) + + return (successful_retries / len(retry_sequences)) * 100.0 + + +# ============================================================================ +# Node Failure Analysis Functions +# ============================================================================ + + +def analyze_node_failures(workflows: List[Workflow]) -> List[NodeFailureStats]: + """ + Analyze failure patterns by node type across workflows. + + Args: + workflows: List of Workflow objects + + Returns: + List of NodeFailureStats sorted by failure_rate descending + """ + # Aggregate by node name + node_data: Dict[str, Dict[str, Any]] = {} + + for workflow in workflows: + # Detect all retry sequences for this workflow + retry_sequences = detect_retry_sequences(workflow) + + for trace in workflow.all_traces: + node_name = trace.name + if node_name not in node_data: + node_data[node_name] = { + "total_executions": 0, + "failure_count": 0, + "success_count": 0, + "retry_sequences": 0, + "error_types": {}, + } + + node_data[node_name]["total_executions"] += 1 + + if trace.status in FAILURE_STATUSES: + node_data[node_name]["failure_count"] += 1 + # Track error type + error_type = classify_error(trace.error) + if error_type not in node_data[node_name]["error_types"]: + node_data[node_name]["error_types"][error_type] = 0 + node_data[node_name]["error_types"][error_type] += 1 + elif trace.status in SUCCESS_STATUSES: + node_data[node_name]["success_count"] += 1 + + # Count retry sequences per node + for retry_seq in retry_sequences: + if retry_seq.node_name in node_data: + node_data[retry_seq.node_name]["retry_sequences"] += 1 + + # Create NodeFailureStats objects + stats_list = [] + for node_name, data in node_data.items(): + total_exec = data["total_executions"] + failure_count = data["failure_count"] + failure_rate = (failure_count / total_exec * 100.0) if total_exec > 0 else 0.0 + + # Calculate avg retries when failing + retry_sequences = data["retry_sequences"] + avg_retries = (retry_sequences / failure_count) if failure_count > 0 else 0.0 + + stats = NodeFailureStats( + node_name=node_name, + total_executions=total_exec, + failure_count=failure_count, + success_count=data["success_count"], + failure_rate_percent=failure_rate, + retry_sequences_detected=retry_sequences, + avg_retries_when_failing=avg_retries, + common_error_types=data["error_types"], + ) + stats_list.append(stats) + + # Sort by failure rate descending + stats_list.sort(key=lambda s: s.failure_rate_percent, reverse=True) + + return stats_list + + +# ============================================================================ +# Main Analysis Function +# ============================================================================ + + +def analyze_failures(workflows: List[Workflow]) -> FailureAnalysisResults: + """ + Perform complete failure pattern analysis. + Main entry point for Phase 3C. + + Args: + workflows: List of Workflow objects to analyze + + Returns: + FailureAnalysisResults with complete analysis + """ + if not workflows: + return FailureAnalysisResults( + total_workflows=0, + successful_workflows=0, + failed_workflows=0, + overall_success_rate_percent=0.0, + node_failure_stats=[], + highest_failure_node=None, + error_type_distribution={}, + most_common_error_type=None, + total_retry_sequences=0, + retry_sequences=[], + retry_success_rate_percent=None, + avg_cost_of_retries=None, + validator_analyses=[], + redundant_validators=[], + quality_risks_at_scale=[], + ) + + # Detect all failures + all_failures = [] + for workflow in workflows: + failures = detect_failures(workflow) + all_failures.extend(failures) + + # Detect all retry sequences + all_retry_sequences = [] + for workflow in workflows: + retries = detect_retry_sequences(workflow) + all_retry_sequences.extend(retries) + + # Calculate overall success rate + total_workflows = len(workflows) + failed_workflows = sum( + 1 for workflow in workflows if workflow.root_trace.status in FAILURE_STATUSES + ) + successful_workflows = total_workflows - failed_workflows + overall_success_rate = ( + (successful_workflows / total_workflows * 100.0) if total_workflows > 0 else 0.0 + ) + + # Analyze node failures + node_failure_stats = analyze_node_failures(workflows) + highest_failure_node = ( + node_failure_stats[0].node_name if node_failure_stats else None + ) + + # Aggregate error types + error_type_distribution: Dict[str, int] = {} + for failure in all_failures: + error_type = failure.error_type + if error_type not in error_type_distribution: + error_type_distribution[error_type] = 0 + error_type_distribution[error_type] += 1 + + most_common_error = ( + max(error_type_distribution, key=lambda k: error_type_distribution[k]) + if error_type_distribution + else None + ) + + # Calculate retry success rate + retry_success_rate = calculate_retry_success_rate(all_retry_sequences) + + # Placeholder for validator analysis (not yet implemented) + validator_analyses: List[ValidatorEffectivenessAnalysis] = [] + redundant_validators: List[str] = [] + + # Placeholder for quality risks (not yet implemented) + quality_risks_at_scale: List[str] = [] + + return FailureAnalysisResults( + total_workflows=total_workflows, + successful_workflows=successful_workflows, + failed_workflows=failed_workflows, + overall_success_rate_percent=overall_success_rate, + node_failure_stats=node_failure_stats, + highest_failure_node=highest_failure_node, + error_type_distribution=error_type_distribution, + most_common_error_type=most_common_error, + total_retry_sequences=len(all_retry_sequences), + retry_sequences=all_retry_sequences, + retry_success_rate_percent=retry_success_rate, + avg_cost_of_retries=None, # Not yet implemented + validator_analyses=validator_analyses, + redundant_validators=redundant_validators, + quality_risks_at_scale=quality_risks_at_scale, + ) diff --git a/analyze_traces.py b/analyze_traces.py index aa95f27..09750c9 100644 --- a/analyze_traces.py +++ b/analyze_traces.py @@ -26,17 +26,20 @@ class Trace: Attributes: id: Unique identifier for the trace - name: Name of the trace (e.g., 'LangGraph', 'generate_spec') + name: Name of the trace (e.g., 'LangGraph', 'process_data') start_time: When the trace started execution end_time: When the trace completed duration_seconds: Total execution time in seconds - status: Execution status ('success', 'error', etc.) + status: Execution status ('success', 'error', etc.') run_type: Type of run ('chain', 'llm', 'tool') parent_id: ID of parent trace (None for root traces) child_ids: List of child trace IDs inputs: Input parameters to the trace outputs: Output results from the trace error: Error message if execution failed + total_tokens: Total tokens used (None for non-LLM traces) + prompt_tokens: Input/prompt tokens (None for non-LLM traces) + completion_tokens: Output/completion tokens (None for non-LLM traces) """ id: str @@ -51,6 +54,9 @@ class Trace: inputs: Dict[str, Any] outputs: Dict[str, Any] error: Optional[str] + total_tokens: Optional[int] = None + prompt_tokens: Optional[int] = None + completion_tokens: Optional[int] = None @dataclass @@ -59,7 +65,7 @@ class Workflow: Represents a complete workflow execution with hierarchical structure. A workflow typically represents a LangGraph execution with multiple - child nodes (e.g., generate_spec, validators, xml_transformation). + child nodes (e.g., process_data, validators, transform_output). Attributes: root_trace: The root/parent trace (usually LangGraph) @@ -140,6 +146,9 @@ def _build_trace_from_dict( inputs=trace_dict.get("inputs", {}), outputs=trace_dict.get("outputs", {}), error=trace_dict.get("error"), + total_tokens=trace_dict.get("total_tokens"), + prompt_tokens=trace_dict.get("prompt_tokens"), + completion_tokens=trace_dict.get("completion_tokens"), ) return trace @@ -395,7 +404,7 @@ class NodePerformance: Performance metrics for a single node type across workflows. Attributes: - node_name: Name of the node (e.g., 'generate_spec', 'xml_transformation') + node_name: Name of the node (e.g., 'process_data', 'transform_output') execution_count: Number of times this node executed across all workflows avg_duration_seconds: Average execution time in seconds median_duration_seconds: Median execution time in seconds diff --git a/export_langsmith_traces.py b/export_langsmith_traces.py index 3cf494a..212109d 100644 --- a/export_langsmith_traces.py +++ b/export_langsmith_traces.py @@ -390,6 +390,87 @@ def _format_single_run(self, run: Any) -> Dict[str, Any]: for child in child_runs: formatted_children.append(self._format_single_run(child)) + # Extract cache token data with fallback logic + # Try multiple locations: top-level fields, then nested in outputs/inputs + cache_read_tokens = getattr(run, "cache_read_tokens", None) + cache_creation_tokens = getattr(run, "cache_creation_tokens", None) + + # Fallback 1: Check LangChain message format (primary location in exports) + # outputs["generations"][0][0]["message"]["kwargs"]["usage_metadata"]["input_token_details"] + if cache_read_tokens is None or cache_creation_tokens is None: + outputs = getattr(run, "outputs", {}) + if isinstance(outputs, dict): + generations = outputs.get("generations", [[]]) + if generations and len(generations) > 0 and len(generations[0]) > 0: + message = generations[0][0] + if isinstance(message, dict): + message_obj = message.get("message", {}) + if isinstance(message_obj, dict): + kwargs = message_obj.get("kwargs", {}) + if isinstance(kwargs, dict): + usage_metadata = kwargs.get("usage_metadata", {}) + if isinstance(usage_metadata, dict): + input_token_details = usage_metadata.get( + "input_token_details", {} + ) + if isinstance(input_token_details, dict): + if cache_read_tokens is None: + cache_read_tokens = input_token_details.get( + "cache_read" + ) + if cache_creation_tokens is None: + cache_creation_tokens = ( + input_token_details.get( + "cache_creation" + ) + ) + if cache_creation_tokens is None: + cache_creation_tokens = ( + input_token_details.get( + "cache_creation_input_tokens" + ) + ) + + # Fallback 2: Check outputs["usage_metadata"]["input_token_details"] + if cache_read_tokens is None or cache_creation_tokens is None: + outputs = getattr(run, "outputs", {}) + if isinstance(outputs, dict): + usage_metadata = outputs.get("usage_metadata", {}) + if isinstance(usage_metadata, dict): + input_token_details = usage_metadata.get("input_token_details", {}) + if isinstance(input_token_details, dict): + if cache_read_tokens is None: + cache_read_tokens = input_token_details.get("cache_read") + if cache_creation_tokens is None: + # Try both possible field names (use explicit None check to preserve 0 values) + cache_creation_tokens = input_token_details.get( + "cache_creation" + ) + if cache_creation_tokens is None: + cache_creation_tokens = input_token_details.get( + "cache_creation_input_tokens" + ) + + # Fallback 3: Check inputs["usage_metadata"]["input_token_details"] (less common) + if cache_read_tokens is None or cache_creation_tokens is None: + inputs = getattr(run, "inputs", {}) + if isinstance(inputs, dict): + usage_metadata = inputs.get("usage_metadata", {}) + if isinstance(usage_metadata, dict): + input_token_details = usage_metadata.get("input_token_details", {}) + if isinstance(input_token_details, dict): + if cache_read_tokens is None: + cache_read_tokens = input_token_details.get("cache_read") + if cache_creation_tokens is None: + # Try both possible field names (use explicit None check to preserve 0 values) + cache_creation_tokens = input_token_details.get( + "cache_creation" + ) + if cache_creation_tokens is None: + cache_creation_tokens = input_token_details.get( + "cache_creation_input_tokens" + ) + trace = { "id": str(getattr(run, "id", None)) if hasattr(run, "id") else None, "name": getattr(run, "name", None), @@ -410,6 +491,11 @@ def _format_single_run(self, run: Any) -> Dict[str, Any]: "error": getattr(run, "error", None), "run_type": getattr(run, "run_type", None), "child_runs": formatted_children, + "total_tokens": getattr(run, "total_tokens", None), + "prompt_tokens": getattr(run, "prompt_tokens", None), + "completion_tokens": getattr(run, "completion_tokens", None), + "cache_read_tokens": cache_read_tokens, + "cache_creation_tokens": cache_creation_tokens, } return trace diff --git a/plans/phase3bc_cost_failure_analysis_plan.md b/plans/phase3bc_cost_failure_analysis_plan.md new file mode 100644 index 0000000..06a1e5a --- /dev/null +++ b/plans/phase3bc_cost_failure_analysis_plan.md @@ -0,0 +1,846 @@ +# Implementation Plan: Phase 3B & 3C Data Analysis + +## Executive Summary + +Implement test-driven Python analysis tools to calculate Phase 3B (Cost Analysis) and Phase 3C (Failure Pattern Analysis) metrics from LangSmith trace data. This extends the existing export-langsmith-data repository with two new analysis modules following established TDD patterns from Phase 3A. + +**Key Architectural Decision**: Create separate modules (`analyze_cost.py` and `analyze_failures.py`) to maintain clean separation of concerns while reusing shared data structures from Phase 3A. + +**Estimated Effort**: 18-26 hours (8-12 hours Phase 3B, 10-14 hours Phase 3C) + +--- + +## Context + +### Requirements from phase-3-data-analysis-outline.md + +**Phase 3B: Cost Order-of-Magnitude Analysis** +- Calculate cost per workflow using token usage × Gemini 1.5 Pro pricing +- Breakdown costs by node type to identify expensive components +- Assess cache effectiveness (cost savings percentage) +- Project scaling costs at 10x, 100x, 1000x volume +- Determine economic viability at scale + +**Phase 3C: Failure Pattern Analysis** +- Calculate overall success rate (no external wrapper data - infer from traces) +- Identify failure patterns by node and error type +- Detect and analyze retry sequences using heuristics +- Assess validator effectiveness and redundancy +- Quantify retry costs and success rates +- Identify quality risks at scale + +### Data Availability Confirmed + +✅ **Token Usage Data**: Available in traces as `usage_metadata` with fields: +- `input_tokens`, `output_tokens`, `total_tokens` +- `input_token_details.cache_read` for cached token counts + +✅ **Trace Status**: Available for failure detection +- `status` field: "success", "error", etc. +- `error` field: Error messages for classification + +❌ **No External Wrapper Data**: Must infer failures and retries from trace patterns + +### Repository Architecture (Existing) + +``` +export-langsmith-data/ +├── analyze_traces.py # Phase 3A: 727 lines, performance analysis +├── test_analyze_traces.py # 1,272 lines, 64 passing tests +├── export_langsmith_traces.py # 697 lines, LangSmith API export +├── verify_analysis_report.py # 366 lines, deterministic verification +└── (test files) # ~3,500 lines total test coverage +``` + +**Shared Data Structures** (from analyze_traces.py): +- `Trace`: Single run with id, name, status, duration, inputs, outputs, error +- `Workflow`: Complete execution with root_trace and hierarchical nodes +- `TraceDataset`: Container with workflows, metadata, hierarchical flag +- `load_from_json()`: Data loading function + +--- + +## Implementation Strategy + +### 1. Module Organization + +**NEW Files to Create**: +``` +export-langsmith-data/ +├── analyze_cost.py # NEW - Phase 3B Cost Analysis (~700 lines) +├── test_analyze_cost.py # NEW - Phase 3B tests (~900 lines, ~45 tests) +├── analyze_failures.py # NEW - Phase 3C Failure Analysis (~800 lines) +└── test_analyze_failures.py # NEW - Phase 3C tests (~1000 lines, ~56 tests) +``` + +**MODIFIED Files**: +``` +└── verify_analysis_report.py # Extend with 3B/3C verification functions +``` + +**Rationale**: Separate modules maintain clean boundaries, enable independent evolution, and keep test suites focused. Each phase has distinct concerns (economics vs quality) that warrant separate files. + +--- + +## Phase 3B: Cost Analysis Implementation + +### Configuration Constants + +```python +# analyze_cost.py - Top of file + +from dataclasses import dataclass + +@dataclass +class PricingConfig: + """Configurable pricing model for any LLM provider.""" + model_name: str + input_tokens_per_1k: float # Cost per 1K input tokens + output_tokens_per_1k: float # Cost per 1K output tokens + cache_read_per_1k: Optional[float] = None # Cost per 1K cache read tokens (if applicable) + + def __post_init__(self): + """Validate pricing configuration.""" + if self.input_tokens_per_1k < 0 or self.output_tokens_per_1k < 0: + raise ValueError("Token prices must be non-negative") + if self.cache_read_per_1k is not None and self.cache_read_per_1k < 0: + raise ValueError("Cache read price must be non-negative") + + +# Example pricing configs for reference (NOT hard-coded defaults) +# Users should create their own PricingConfig instances +EXAMPLE_PRICING_CONFIGS = { + "gemini_1.5_pro": PricingConfig( + model_name="Gemini 1.5 Pro", + input_tokens_per_1k=0.00125, # $1.25 per 1M input tokens + output_tokens_per_1k=0.005, # $5.00 per 1M output tokens + cache_read_per_1k=0.0003125, # $0.3125 per 1M cache read tokens + ), + # Add other providers as examples +} + +SCALING_FACTORS = [1, 10, 100, 1000] # Current, 10x, 100x, 1000x +``` + +### Core Data Structures + +```python +@dataclass +class TokenUsage: + """Token usage for a single trace.""" + input_tokens: int + output_tokens: int + total_tokens: int + cached_tokens: Optional[int] = None # From input_token_details.cache_read + +@dataclass +class CostBreakdown: + """Cost breakdown for a single trace.""" + trace_id: str + trace_name: str + input_cost: float + output_cost: float + cache_cost: float + total_cost: float + token_usage: TokenUsage + +@dataclass +class NodeCostSummary: + """Cost summary for a node type across workflows.""" + node_name: str + execution_count: int + total_cost: float + avg_cost_per_execution: float + percent_of_total_cost: float + +@dataclass +class CostAnalysisResults: + """Complete cost analysis results.""" + # Per-workflow metrics + avg_cost_per_workflow: float + median_cost_per_workflow: float + min_cost: float + max_cost: float + + # Node-level breakdown + node_summaries: List[NodeCostSummary] # Sorted by cost descending + top_cost_driver: Optional[str] + + # Cache effectiveness + cache_effectiveness_percent: Optional[float] + cache_savings_dollars: Optional[float] + + # Scaling projections + scaling_projections: Dict[str, ScalingProjection] # "10x", "100x", etc. + + # Metadata + total_workflows_analyzed: int + data_quality_notes: List[str] + + def to_csv(self) -> str: + """Export to CSV format for reporting.""" +``` + +### Key Functions + +**Token Extraction**: +```python +def extract_token_usage(trace: Trace) -> Optional[TokenUsage]: + """ + Extract token usage from trace outputs/inputs. + + Checks: + 1. trace.outputs["usage_metadata"] + 2. trace.inputs["usage_metadata"] (fallback) + 3. Extracts cache_read from input_token_details if available + """ + +def extract_workflow_tokens(workflow: Workflow) -> Dict[str, TokenUsage]: + """Extract token usage for all traces in workflow.""" +``` + +**Cost Calculation**: +```python +def calculate_trace_cost( + token_usage: TokenUsage, + pricing_config: PricingConfig +) -> CostBreakdown: + """Calculate cost for single trace using pricing model.""" + +def calculate_workflow_cost( + workflow: Workflow, + pricing_config: PricingConfig +) -> Optional[WorkflowCostAnalysis]: + """Calculate total cost and breakdown by node.""" +``` + +**Aggregation & Analysis**: +```python +def aggregate_node_costs( + workflow_analyses: List[WorkflowCostAnalysis] +) -> List[NodeCostSummary]: + """Aggregate costs by node type, sorted by total_cost descending.""" + +def calculate_cache_effectiveness( + workflow_analyses: List[WorkflowCostAnalysis] +) -> Optional[Tuple[float, float, float]]: + """ + Calculate cache effectiveness if cache data available. + Returns (effectiveness_percent, cost_without_cache, savings_dollars). + """ + +def project_scaling_costs( + avg_cost_per_workflow: float, + current_workflow_count: int, + scaling_factors: List[int] +) -> Dict[str, ScalingProjection]: + """Project costs at 10x, 100x, 1000x scale.""" +``` + +**Main Entry Point**: +```python +def analyze_costs( + workflows: List[Workflow], + pricing_config: PricingConfig +) -> CostAnalysisResults: + """ + Perform complete cost analysis on workflows. + + Args: + workflows: List of Workflow objects to analyze + pricing_config: PricingConfig with provider-specific rates + + Main entry point for Phase 3B. + + Example: + # Create custom pricing config + pricing = PricingConfig( + model_name="Gemini 1.5 Pro", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + cache_read_per_1k=0.0003125 + ) + + results = analyze_costs(workflows, pricing) + """ +``` + +### Testing Strategy (Phase 3B) + +**Test File**: `test_analyze_cost.py` (~45 tests) + +**Test Classes**: +1. `TestTokenExtraction` - Extract tokens from various trace structures +2. `TestCostCalculation` - Basic and edge case cost calculations +3. `TestNodeCostAggregation` - Multi-workflow aggregation +4. `TestCacheEffectiveness` - Cache savings calculations +5. `TestScalingProjections` - Scaling math and viability thresholds +6. `TestCostAnalysisIntegration` - End-to-end with mock workflows +7. `TestCSVExport` - CSV formatting + +**Key Test Cases**: +- Token extraction from `outputs["usage_metadata"]` +- Token extraction with cache_read tokens +- Missing token data returns None gracefully +- Zero-cost traces handled correctly +- Cost calculation accuracy (verify pricing math) +- Node aggregation sorted correctly +- Scaling projections calculate monthly costs +- CSV format matches expected structure + +**TDD Workflow**: Write test → See it fail → Implement minimal code → Pass → Refactor + +### CSV Exports + +```python +def export_node_costs_csv(node_summaries: List[NodeCostSummary]) -> str: + """Export node cost breakdown to CSV.""" + +def export_scaling_projections_csv(projections: Dict[str, ScalingProjection]) -> str: + """Export scaling projections to CSV.""" +``` + +--- + +## Phase 3C: Failure Pattern Analysis Implementation + +### Configuration Constants + +```python +# analyze_failures.py - Top of file + +# Status values indicating failure +FAILURE_STATUSES = {"error", "failed", "cancelled"} +SUCCESS_STATUSES = {"success"} + +# Retry detection heuristics +RETRY_DETECTION_CONFIG = { + "max_time_window_seconds": 300, # 5 min window for retry detection + "same_node_threshold": 2, # 2+ executions = potential retry +} + +# Validator node names (from Phase 3A) +VALIDATOR_NODES = { + "meta_evaluation_and_validation", + "normative_validation", + "simulated_testing", +} + +# Error classification patterns (regex) +ERROR_PATTERNS = { + "validation_failure": r"validation.*fail|invalid.*spec", + "api_timeout": r"timeout|timed out", + "import_error": r"import.*fail|upload.*fail", + "llm_error": r"model.*error|generation.*fail|token.*limit", + "unknown": r".*", # Catch-all +} +``` + +### Core Data Structures + +```python +@dataclass +class FailureInstance: + """Single failure occurrence.""" + trace_id: str + trace_name: str + workflow_id: str + error_message: Optional[str] + error_type: str # Classified from ERROR_PATTERNS + timestamp: Optional[datetime] + +@dataclass +class RetrySequence: + """Detected retry sequence.""" + node_name: str + workflow_id: str + attempt_count: int + attempts: List[Trace] # Ordered by start_time + final_status: str # 'success' or 'failed' + total_duration_seconds: float + total_cost_estimate: Optional[float] = None + +@dataclass +class NodeFailureStats: + """Failure statistics for a node type.""" + node_name: str + total_executions: int + failure_count: int + success_count: int + failure_rate_percent: float + retry_sequences_detected: int + avg_retries_when_failing: float + common_error_types: Dict[str, int] # error_type -> count + +@dataclass +class ValidatorEffectivenessAnalysis: + """Validator effectiveness assessment.""" + validator_name: str + total_executions: int + caught_issues_count: int # Failures detected + pass_rate_percent: float + is_necessary: bool # Based on redundancy analysis + +@dataclass +class FailureAnalysisResults: + """Complete failure pattern analysis results.""" + # Overall metrics + total_workflows: int + successful_workflows: int + failed_workflows: int + overall_success_rate_percent: float + + # Node-level breakdown + node_failure_stats: List[NodeFailureStats] # Sorted by failure_rate + highest_failure_node: Optional[str] + + # Error distribution + error_type_distribution: Dict[str, int] + most_common_error_type: Optional[str] + + # Retry analysis + total_retry_sequences: int + retry_sequences: List[RetrySequence] + retry_success_rate_percent: Optional[float] + avg_cost_of_retries: Optional[float] + + # Validator analysis + validator_analyses: List[ValidatorEffectivenessAnalysis] + redundant_validators: List[str] + + # Quality risks + quality_risks_at_scale: List[str] + + def to_csv(self) -> str: + """Export to CSV format for reporting.""" +``` + +### Key Functions + +**Failure Detection**: +```python +def detect_failures(workflow: Workflow) -> List[FailureInstance]: + """Detect all failures in workflow using trace.status and trace.error.""" + +def classify_error(error_message: Optional[str]) -> str: + """Classify error into type using regex patterns.""" +``` + +**Retry Detection**: +```python +def detect_retry_sequences(workflow: Workflow) -> List[RetrySequence]: + """ + Detect retry sequences using heuristics: + - Multiple executions of same node within time window + - Ordered by start_time + """ + +def calculate_retry_success_rate( + retry_sequences: List[RetrySequence] +) -> Optional[float]: + """Calculate % of retries that eventually succeed.""" +``` + +**Node & Validator Analysis**: +```python +def analyze_node_failures( + workflows: List[Workflow] +) -> List[NodeFailureStats]: + """Analyze failure patterns by node type.""" + +def analyze_validator_effectiveness( + workflows: List[Workflow] +) -> List[ValidatorEffectivenessAnalysis]: + """Assess whether all 3 validators are necessary.""" + +def detect_validator_redundancy( + validator_analyses: List[ValidatorEffectivenessAnalysis] +) -> List[str]: + """Identify validators with >90% overlap in caught issues.""" +``` + +**Root Cause & Risk**: +```python +def identify_root_causes( + failure_instances: List[FailureInstance], + node_stats: List[NodeFailureStats] +) -> Dict[str, List[str]]: + """ + Categorize root causes: + - Prompt issues (validation failures) + - Logic bugs (repeated errors) + - External failures (API timeouts, import errors) + """ + +def assess_quality_risks_at_scale( + results: FailureAnalysisResults, + current_volume: int, + projected_volume: int +) -> List[str]: + """Generate risk warnings for scaling.""" +``` + +**Main Entry Point**: +```python +def analyze_failures(workflows: List[Workflow]) -> FailureAnalysisResults: + """ + Perform complete failure pattern analysis. + Main entry point for Phase 3C. + """ +``` + +### Testing Strategy (Phase 3C) + +**Test File**: `test_analyze_failures.py` (~56 tests) + +**Test Classes**: +1. `TestFailureDetection` - Detect failures from status/error fields +2. `TestRetryDetection` - Heuristic-based retry sequence detection +3. `TestNodeFailureAnalysis` - Node-level statistics +4. `TestValidatorEffectiveness` - Validator redundancy analysis +5. `TestRootCauseAnalysis` - Error categorization +6. `TestQualityRiskAssessment` - Risk identification +7. `TestFailureAnalysisIntegration` - End-to-end tests +8. `TestCSVExport` - CSV formatting + +**Key Test Cases**: +- Detect single and multiple failures in workflow +- Classify errors using regex patterns +- Detect retry sequences with 2, 3, 5+ attempts +- Respect time window for retry detection +- Calculate retry success rates correctly +- Identify redundant validators (overlap detection) +- Generate appropriate risk warnings +- Handle workflows with 0% and 100% success rates + +### CSV Exports + +```python +def export_node_failures_csv(node_stats: List[NodeFailureStats]) -> str: + """Export node failure statistics to CSV.""" + +def export_validator_effectiveness_csv( + analyses: List[ValidatorEffectivenessAnalysis] +) -> str: + """Export validator effectiveness to CSV.""" + +def export_retry_analysis_csv(retry_sequences: List[RetrySequence]) -> str: + """Export retry sequence analysis to CSV.""" +``` + +--- + +## Integration with Verification Tool + +### Extend verify_analysis_report.py + +Add two new verification functions: + +```python +def verify_cost_analysis( + dataset: TraceDataset, + expected: Optional[Dict[str, Any]] = None +) -> CostAnalysisResults: + """ + Verify Phase 3B cost calculations. + + Displays: + - Cost per workflow (avg, median, range) + - Top 3 cost drivers + - Scaling projections (10x, 100x, 1000x) + - Cache effectiveness if available + """ + +def verify_failure_analysis( + dataset: TraceDataset, + expected: Optional[Dict[str, Any]] = None +) -> FailureAnalysisResults: + """ + Verify Phase 3C failure calculations. + + Displays: + - Overall success rate + - Top 5 nodes by failure rate + - Retry analysis (sequences detected, success rate) + - Validator effectiveness + """ +``` + +Update `main()` to accept `--phases` argument: +```python +parser.add_argument("--phases", type=str, default="all", + help="Phases to verify: 3a, 3b, 3c, or all") +``` + +--- + +## Implementation Sequence (TDD Workflow) + +### Phase 3B: Cost Analysis (8-12 hours) + +**Step 1: Setup** (30 min) +- Create `analyze_cost.py` with pricing constants and imports +- Create `test_analyze_cost.py` with test structure +- Add initial docstrings + +**Step 2: Token Extraction** (2-3 hours) +1. Write test: `test_extract_token_usage_from_outputs()` +2. Implement `extract_token_usage()` to pass +3. Write test: `test_extract_token_usage_with_cache_data()` +4. Extend implementation for cache tokens +5. Write test: `test_extract_token_usage_missing_data()` +6. Handle None return gracefully +7. **CRITICAL**: Test with REAL trace data first to confirm field locations + +**Step 3: Cost Calculation** (2-3 hours) +1. Write test: `test_calculate_trace_cost_basic()` +2. Implement basic calculation (input + output tokens) +3. Write test: `test_calculate_trace_cost_with_cache()` +4. Add cache cost logic +5. Write test: `test_calculate_workflow_cost()` +6. Implement workflow-level aggregation + +**Step 4: Aggregation & Analysis** (2-3 hours) +1. Write tests for `aggregate_node_costs()` +2. Implement node-level aggregation with sorting +3. Write tests for `calculate_cache_effectiveness()` +4. Implement cache savings calculation +5. Write tests for `project_scaling_costs()` +6. Implement scaling projections + +**Step 5: Main Analysis & Export** (1-2 hours) +1. Write integration test for `analyze_costs()` +2. Implement main orchestration function +3. Add data quality checks and warnings +4. Implement CSV export functions +5. Test CSV format + +**Step 6: Verification Integration** (1 hour) +1. Add `verify_cost_analysis()` to verify_analysis_report.py +2. Test verification with sample data +3. Update main() with --phases argument + +### Phase 3C: Failure Analysis (10-14 hours) + +**Step 1: Setup** (30 min) +- Create `analyze_failures.py` with error patterns and constants +- Create `test_analyze_failures.py` with test structure + +**Step 2: Failure Detection** (2-3 hours) +1. Write tests for `detect_failures()` +2. Implement status-based detection +3. Write tests for `classify_error()` +4. Implement regex-based classification + +**Step 3: Retry Detection** (3-4 hours) +1. Write tests for simple retry detection (2 attempts) +2. Implement basic retry detection +3. Write tests for complex scenarios (5+ attempts, time windows) +4. Refine detection heuristics +5. Write tests for `calculate_retry_success_rate()` +6. Implement success rate calculation + +**Step 4: Node Failure Analysis** (2-3 hours) +1. Write tests for `analyze_node_failures()` +2. Implement aggregation and statistics +3. Test sorting by failure_rate +4. Test error type aggregation + +**Step 5: Validator Analysis** (2-3 hours) +1. Write tests for `analyze_validator_effectiveness()` +2. Implement effectiveness metrics +3. Write tests for `detect_validator_redundancy()` +4. Implement overlap detection logic + +**Step 6: Root Cause & Integration** (2-3 hours) +1. Write tests for `identify_root_causes()` +2. Implement categorization logic +3. Write tests for `assess_quality_risks_at_scale()` +4. Implement risk warning generation +5. Write integration test for `analyze_failures()` +6. Implement main function and CSV exports + +**Step 7: Verification Integration** (1 hour) +1. Add `verify_failure_analysis()` to verify_analysis_report.py +2. Test verification with sample data + +--- + +## Data Quality Handling + +### Edge Cases to Handle + +**Phase 3B**: +- Missing token data → Skip trace, add to data_quality_notes +- Zero-cost traces → Include in analysis (valid for non-LLM nodes) +- Missing cache data → Skip cache analysis, report "N/A" +- Incomplete workflows → Partial cost calculated, note in quality report + +**Phase 3C**: +- No failures → Report 100% success rate (valid result) +- Missing error messages → Classify as "unknown" +- Ambiguous retries → Use time window + node name heuristic +- Missing validators → Note in analysis, skip redundancy check + +### Data Quality Reporting + +Both modules include `data_quality_notes` field in results: + +```python +data_quality_notes: List[str] = [ + "Token data available for 95/100 workflows", + "Cache data not available - cache analysis skipped", + "Retry detection based on heuristics (no definitive markers)", +] +``` + +--- + +## Critical Pre-Implementation Steps + +### BEFORE starting implementation, MUST complete: + +1. **Verify Token Data Structure** (30 min) + ```python + # Inspect actual trace structure + import json + with open("trace_export_1000.json") as f: + data = json.load(f) + + # Find LLM trace and print structure + trace = [t for t in data["traces"] if t["name"] == "ChatGoogleGenerativeAI"][0] + print(json.dumps(trace["outputs"], indent=2)) + ``` + +2. **Create Client-Specific Pricing Script** (15 min) + - Research current Gemini 1.5 Pro pricing (Dec 2025) + - Create `scripts/analyze_with_gemini_pricing.py` in client repo + - Script instantiates PricingConfig with Gemini rates + - Calls analyze_costs() with custom pricing + +3. **Document Field Locations** (15 min) + - Record exact path to usage_metadata + - Record exact path to cache_read tokens + - Create example trace structure in docstrings + +--- + +## Success Criteria + +### Phase 3B Complete When: +- ✅ All 45+ tests passing +- ✅ Can answer: "What does a workflow cost?" → $__ +- ✅ Can answer: "What's the biggest cost driver?" → [node name] +- ✅ Can answer: "Can it scale to 100x volume?" → YES/NO with projection +- ✅ Cache effectiveness calculated (or "N/A" if unavailable) +- ✅ CSV exports generated for all metrics +- ✅ Verification tool confirms calculations + +### Phase 3C Complete When: +- ✅ All 56+ tests passing +- ✅ Can answer: "What's the success rate?" → ___% +- ✅ Can answer: "Where do failures happen most?" → [node name + rate] +- ✅ Can answer: "Are retries effective?" → ___% success rate +- ✅ Can answer: "Are all 3 validators necessary?" → YES/NO with data +- ✅ CSV exports generated for failures, retries, validators +- ✅ Quality risks identified for scaling +- ✅ Verification tool confirms calculations + +--- + +## Deliverables + +### Code Deliverables +1. `analyze_cost.py` (~700 lines) with full documentation +2. `test_analyze_cost.py` (~900 lines, 45+ tests passing) +3. `analyze_failures.py` (~800 lines) with full documentation +4. `test_analyze_failures.py` (~1000 lines, 56+ tests passing) +5. Extended `verify_analysis_report.py` with 3B/3C verification +6. Updated README.md with Phase 3B/3C usage examples + +### Analysis Outputs (for client Assessment) +1. Cost analysis CSV exports: + - Node cost breakdown + - Scaling projections + - Cache effectiveness (if available) +2. Failure analysis CSV exports: + - Node failure statistics + - Retry analysis + - Validator effectiveness +3. Verification reports confirming all calculations + +--- + +## Repository Separation Strategy + +**Generalized Tools** (export-langsmith-data repo): +- `analyze_cost.py` - Generic cost analysis for any LangSmith traces +- `analyze_failures.py` - Generic failure analysis for any traces +- All test files - Reusable test patterns +- `verify_analysis_report.py` - Verification framework + +**Client-Specific Analysis** (client-project/Assessment/data): +- Actual trace data files (.json) +- Generated CSV reports +- Client-specific interpretation and findings +- Phase 3B/3C markdown reports referencing data +- **Custom pricing script** (`scripts/analyze_with_gemini_pricing.py`): + - Imports `PricingConfig` from generalized tool + - Creates config with Gemini-specific rates + - Runs analysis with client's trace data + +This separation allows: +- Reuse of analysis tools across projects +- Client confidentiality (data stays in Assessment repo) +- Test-first development in open repo +- Easy updates to pricing models and error patterns + +--- + +## Risk Mitigation + +| Risk | Mitigation | +|------|------------| +| Token data not in expected location | Inspect real data FIRST, implement flexible extraction | +| Cache data unavailable | Make cache analysis optional, graceful degradation | +| Retry detection false positives | Tune time window (5 min), manual spot-checking | +| Pricing model outdated | Externalize constants, document update procedure | +| Large datasets cause memory issues | Process in batches if needed (unlikely with n=10-1000) | +| Test coverage gaps | Strict TDD, aim for >90% coverage | + +--- + +## Critical Files + +**To Read** (for reference patterns): +1. `analyze_traces.py` +2. `test_analyze_traces.py` +3. `verify_analysis_report.py` + +**To Create**: +1. `analyze_cost.py` +2. `test_analyze_cost.py` +3. `analyze_failures.py` +4. `test_analyze_failures.py` + +**To Modify**: +1. `verify_analysis_report.py` + +**Data Files** (for inspection): +1. `trace_export_1000.json` (newly exported) +2. `../client-project/Assessment/data/trace_export_1000.json` +3. `../client-project/Assessment/data/trace_export_complete_workflows.json` + +--- + +## Next Steps After Approval + +1. **Data Investigation** (30 min) - Inspect token field locations in real traces +2. **Pricing Research** (15 min) - Confirm Gemini 1.5 Pro current pricing +3. **Setup** (30 min) - Create files, initial structure, constants +4. **Phase 3B Implementation** (8-12 hours) - TDD workflow +5. **Phase 3C Implementation** (10-14 hours) - TDD workflow +6. **Verification & Documentation** (2-3 hours) - Final testing, README updates +7. **Generate Reports** (1-2 hours) - Run on actual data, export CSVs for Assessment + +**Total Estimated Time**: 18-26 hours for complete implementation diff --git a/test_analyze_cost.py b/test_analyze_cost.py new file mode 100644 index 0000000..62ad903 --- /dev/null +++ b/test_analyze_cost.py @@ -0,0 +1,1046 @@ +""" +Test suite for Phase 3B: Cost Analysis + +Following TDD methodology - tests written FIRST, then implementation. +Tests for cost analysis functionality including token extraction, +cost calculation, and scaling projections. + +Author: Generated with Claude Code (PDCA Framework) +Date: 2025-12-09 +""" + +import pytest +from datetime import datetime, timezone +from analyze_cost import ( + PricingConfig, + TokenUsage, + extract_token_usage, + calculate_trace_cost, +) +from analyze_traces import Trace + + +class TestPricingConfig: + """Test PricingConfig dataclass validation.""" + + def test_pricing_config_creation_valid(self): + """Test creating valid pricing config.""" + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.001, + output_tokens_per_1k=0.002, + cache_read_per_1k=0.0001, + ) + assert pricing.model_name == "Test Model" + assert pricing.input_tokens_per_1k == 0.001 + assert pricing.output_tokens_per_1k == 0.002 + assert pricing.cache_read_per_1k == 0.0001 + + def test_pricing_config_without_cache(self): + """Test pricing config without cache pricing.""" + pricing = PricingConfig( + model_name="No Cache Model", + input_tokens_per_1k=0.001, + output_tokens_per_1k=0.002, + ) + assert pricing.cache_read_per_1k is None + + def test_pricing_config_negative_input_price_raises(self): + """Test that negative input price raises ValueError.""" + with pytest.raises(ValueError, match="non-negative"): + PricingConfig( + model_name="Bad Model", + input_tokens_per_1k=-0.001, + output_tokens_per_1k=0.002, + ) + + def test_pricing_config_negative_output_price_raises(self): + """Test that negative output price raises ValueError.""" + with pytest.raises(ValueError, match="non-negative"): + PricingConfig( + model_name="Bad Model", + input_tokens_per_1k=0.001, + output_tokens_per_1k=-0.002, + ) + + def test_pricing_config_negative_cache_price_raises(self): + """Test that negative cache price raises ValueError.""" + with pytest.raises(ValueError, match="non-negative"): + PricingConfig( + model_name="Bad Model", + input_tokens_per_1k=0.001, + output_tokens_per_1k=0.002, + cache_read_per_1k=-0.0001, + ) + + +class TestTokenExtraction: + """Test token usage extraction from traces.""" + + def test_extract_token_usage_from_outputs(self): + """Test extracting tokens from trace.outputs['usage_metadata'].""" + trace = Trace( + id="test-1", + name="ChatGoogleGenerativeAI", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id=None, + child_ids=[], + inputs={}, + outputs={ + "usage_metadata": { + "input_tokens": 1000, + "output_tokens": 500, + "total_tokens": 1500, + } + }, + error=None, + ) + + result = extract_token_usage(trace) + + assert result is not None + assert result.input_tokens == 1000 + assert result.output_tokens == 500 + assert result.total_tokens == 1500 + assert result.cached_tokens is None + + def test_extract_token_usage_with_cache_data(self): + """Test extracting tokens including cache_read data.""" + trace = Trace( + id="test-2", + name="ChatGoogleGenerativeAI", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id=None, + child_ids=[], + inputs={}, + outputs={ + "usage_metadata": { + "input_tokens": 1000, + "output_tokens": 500, + "total_tokens": 1500, + "input_token_details": { + "cache_read": 800, + }, + } + }, + error=None, + ) + + result = extract_token_usage(trace) + + assert result is not None + assert result.input_tokens == 1000 + assert result.output_tokens == 500 + assert result.total_tokens == 1500 + assert result.cached_tokens == 800 + + def test_extract_token_usage_missing_data_returns_none(self): + """Test that missing token data returns None.""" + trace = Trace( + id="test-3", + name="non_llm_node", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=[], + inputs={}, + outputs={}, + error=None, + ) + + result = extract_token_usage(trace) + + assert result is None + + def test_extract_token_usage_from_inputs_fallback(self): + """Test fallback to extracting from inputs if not in outputs.""" + trace = Trace( + id="test-4", + name="ChatGoogleGenerativeAI", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id=None, + child_ids=[], + inputs={ + "usage_metadata": { + "input_tokens": 2000, + "output_tokens": 1000, + "total_tokens": 3000, + } + }, + outputs={}, + error=None, + ) + + result = extract_token_usage(trace) + + assert result is not None + assert result.input_tokens == 2000 + assert result.output_tokens == 1000 + + +class TestCostCalculation: + """Test cost calculation functions.""" + + def test_calculate_trace_cost_basic(self): + """Test basic cost calculation without cache.""" + token_usage = TokenUsage( + input_tokens=1000, + output_tokens=500, + total_tokens=1500, + cached_tokens=None, + ) + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, # $1.25 per 1M + output_tokens_per_1k=0.005, # $5.00 per 1M + ) + + result = calculate_trace_cost(token_usage, pricing) + + # Expected: (1000 * 0.00125 / 1000) + (500 * 0.005 / 1000) + # = 0.00125 + 0.0025 = 0.00375 + assert result.trace_id is not None + assert result.input_cost == pytest.approx(0.00125, abs=0.00001) + assert result.output_cost == pytest.approx(0.0025, abs=0.00001) + assert result.cache_cost == 0.0 + assert result.total_cost == pytest.approx(0.00375, abs=0.00001) + + def test_calculate_trace_cost_with_cache(self): + """Test cost calculation with cache reads.""" + token_usage = TokenUsage( + input_tokens=1000, + output_tokens=500, + total_tokens=1500, + cached_tokens=800, + ) + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + cache_read_per_1k=0.0003125, # $0.3125 per 1M + ) + + result = calculate_trace_cost(token_usage, pricing) + + # Expected cache cost: 800 * 0.0003125 / 1000 = 0.00025 + assert result.cache_cost == pytest.approx(0.00025, abs=0.00001) + # Total: 0.00125 + 0.0025 + 0.00025 = 0.00400 + assert result.total_cost == pytest.approx(0.00400, abs=0.00001) + + def test_calculate_trace_cost_zero_tokens(self): + """Test handling of zero token usage.""" + token_usage = TokenUsage( + input_tokens=0, + output_tokens=0, + total_tokens=0, + cached_tokens=None, + ) + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + ) + + result = calculate_trace_cost(token_usage, pricing) + + assert result.input_cost == 0.0 + assert result.output_cost == 0.0 + assert result.total_cost == 0.0 + + +class TestWorkflowCostAnalysis: + """Test workflow-level cost analysis.""" + + def test_calculate_workflow_cost_single_trace(self): + """Test calculating cost for workflow with one LLM trace.""" + from analyze_traces import Workflow + + # Create root trace (no cost) + root = Trace( + id="root-1", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["child-1"], + inputs={}, + outputs={}, + error=None, + ) + + # Create child LLM trace with cost + child = Trace( + id="child-1", + name="ChatGoogleGenerativeAI", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={ + "usage_metadata": { + "input_tokens": 1000, + "output_tokens": 500, + "total_tokens": 1500, + } + }, + error=None, + ) + + workflow = Workflow( + root_trace=root, + nodes={"ChatGoogleGenerativeAI": [child]}, + all_traces=[root, child], + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + ) + + from analyze_cost import calculate_workflow_cost + + result = calculate_workflow_cost(workflow, pricing) + + assert result is not None + assert result.workflow_id == "root-1" + assert len(result.node_costs) == 1 + assert result.total_cost == pytest.approx(0.00375, abs=0.00001) + + def test_calculate_workflow_cost_no_token_data(self): + """Test workflow with no token data returns None.""" + from analyze_traces import Workflow + + root = Trace( + id="root-2", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=[], + inputs={}, + outputs={}, + error=None, + ) + + workflow = Workflow( + root_trace=root, + nodes={}, + all_traces=[root], + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + ) + + from analyze_cost import calculate_workflow_cost + + result = calculate_workflow_cost(workflow, pricing) + + # Should return result with zero cost, not None + assert result is not None + assert result.total_cost == 0.0 + assert len(result.node_costs) == 0 + + +class TestScalingProjections: + """Test scaling cost projections.""" + + def test_project_scaling_costs_basic(self): + """Test basic scaling projections at 10x, 100x, 1000x.""" + from analyze_cost import project_scaling_costs, SCALING_FACTORS + + avg_cost_per_workflow = 0.01 # $0.01 per workflow + current_workflow_count = 100 + + result = project_scaling_costs( + avg_cost_per_workflow=avg_cost_per_workflow, + current_workflow_count=current_workflow_count, + scaling_factors=SCALING_FACTORS, + ) + + assert result is not None + assert "1x" in result + assert "10x" in result + assert "100x" in result + assert "1000x" in result + + # Verify 1x (current) + assert result["1x"].scale_factor == 1 + assert result["1x"].workflow_count == 100 + assert result["1x"].total_cost == pytest.approx(1.0, abs=0.01) # 100 * $0.01 + + # Verify 10x + assert result["10x"].scale_factor == 10 + assert result["10x"].workflow_count == 1000 + assert result["10x"].total_cost == pytest.approx(10.0, abs=0.01) + + # Verify 100x + assert result["100x"].scale_factor == 100 + assert result["100x"].workflow_count == 10000 + assert result["100x"].total_cost == pytest.approx(100.0, abs=0.01) + + # Verify 1000x + assert result["1000x"].scale_factor == 1000 + assert result["1000x"].workflow_count == 100000 + assert result["1000x"].total_cost == pytest.approx(1000.0, abs=0.01) + + def test_project_scaling_costs_with_monthly(self): + """Test that monthly costs are calculated correctly.""" + from analyze_cost import project_scaling_costs + + avg_cost_per_workflow = 0.05 + current_workflow_count = 50 + monthly_estimate = 500 # Assume 500 workflows per month + + result = project_scaling_costs( + avg_cost_per_workflow=avg_cost_per_workflow, + current_workflow_count=current_workflow_count, + scaling_factors=[1, 10], + monthly_workflow_estimate=monthly_estimate, + ) + + # At 1x: 500 workflows/month * $0.05 = $25/month + assert result["1x"].cost_per_month_30days is not None + assert result["1x"].cost_per_month_30days == pytest.approx(25.0, abs=0.01) + + # At 10x: 5000 workflows/month * $0.05 = $250/month + assert result["10x"].cost_per_month_30days is not None + assert result["10x"].cost_per_month_30days == pytest.approx(250.0, abs=0.01) + + def test_project_scaling_costs_zero_cost(self): + """Test handling of zero cost per workflow.""" + from analyze_cost import project_scaling_costs + + result = project_scaling_costs( + avg_cost_per_workflow=0.0, + current_workflow_count=100, + scaling_factors=[1, 10], + ) + + assert result["1x"].total_cost == 0.0 + assert result["10x"].total_cost == 0.0 + + +class TestNodeCostAggregation: + """Test node-level cost aggregation across workflows.""" + + def test_aggregate_node_costs_multiple_workflows(self): + """Test aggregating costs by node type across multiple workflows.""" + from analyze_cost import ( + aggregate_node_costs, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + ) + + # Create mock workflow analyses + workflow1_costs = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0, + total_cost=0.003, + token_usage=TokenUsage(1000, 500, 1500), + ), + CostBreakdown( + trace_id="2", + trace_name="Validator", + input_cost=0.0005, + output_cost=0.001, + cache_cost=0.0, + total_cost=0.0015, + token_usage=TokenUsage(500, 250, 750), + ), + ] + + workflow2_costs = [ + CostBreakdown( + trace_id="3", + trace_name="ChatModel", + input_cost=0.002, + output_cost=0.003, + cache_cost=0.0, + total_cost=0.005, + token_usage=TokenUsage(2000, 1000, 3000), + ), + ] + + workflows = [ + WorkflowCostAnalysis("wf1", 0.0045, workflow1_costs, 2250), + WorkflowCostAnalysis("wf2", 0.005, workflow2_costs, 3000), + ] + + result = aggregate_node_costs(workflows) + + assert result is not None + assert len(result) == 2 # ChatModel and Validator + + # Should be sorted by total cost descending + assert result[0].node_name == "ChatModel" + assert result[0].execution_count == 2 + assert result[0].total_cost == pytest.approx(0.008, abs=0.0001) + assert result[0].avg_cost_per_execution == pytest.approx(0.004, abs=0.0001) + + assert result[1].node_name == "Validator" + assert result[1].execution_count == 1 + assert result[1].total_cost == pytest.approx(0.0015, abs=0.0001) + + def test_aggregate_node_costs_empty_workflows(self): + """Test aggregating with no workflows.""" + from analyze_cost import aggregate_node_costs + + result = aggregate_node_costs([]) + + assert result is not None + assert len(result) == 0 + + +class TestMainAnalysisFunction: + """Test main analyze_costs() orchestration function.""" + + def test_analyze_costs_integration(self): + """Test complete cost analysis workflow.""" + from analyze_cost import analyze_costs, PricingConfig + from analyze_traces import Workflow, Trace + from datetime import datetime, timezone + + # Create minimal workflow for testing + root = Trace( + id="root-1", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["child-1"], + inputs={}, + outputs={}, + error=None, + ) + + child = Trace( + id="child-1", + name="ChatModel", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={ + "usage_metadata": { + "input_tokens": 1000, + "output_tokens": 500, + "total_tokens": 1500, + } + }, + error=None, + ) + + workflow = Workflow( + root_trace=root, + nodes={"ChatModel": [child]}, + all_traces=[root, child], + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.001, + output_tokens_per_1k=0.002, + ) + + result = analyze_costs([workflow], pricing) + + assert result is not None + assert result.total_workflows_analyzed == 1 + assert result.avg_cost_per_workflow > 0 + assert len(result.node_summaries) == 1 + assert result.node_summaries[0].node_name == "ChatModel" + assert len(result.scaling_projections) > 0 + assert "1x" in result.scaling_projections + + +class TestCacheEffectiveness: + """Test cache effectiveness analysis functions.""" + + def test_calculate_cache_hit_rate_all_cached(self): + """Test cache hit rate when all traces have cache data.""" + from analyze_cost import ( + calculate_cache_hit_rate, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + ) + + # Create workflow analysis with all traces having cache data + costs_with_cache = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0001, + total_cost=0.0031, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + CostBreakdown( + trace_id="2", + trace_name="Validator", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0002, + total_cost=0.0032, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=900), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.0063, costs_with_cache, 3000)] + + result = calculate_cache_hit_rate(workflows) + + # All 2 traces have cache data = 100% + assert result == pytest.approx(100.0, abs=0.01) + + def test_calculate_cache_hit_rate_partial_cached(self): + """Test cache hit rate when only some traces have cache data.""" + from analyze_cost import ( + calculate_cache_hit_rate, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + ) + + # Mix of cached and non-cached traces + costs_mixed = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0001, + total_cost=0.0031, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + CostBreakdown( + trace_id="2", + trace_name="Validator", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0, + total_cost=0.003, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=None), + ), + CostBreakdown( + trace_id="3", + trace_name="Parser", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0, + total_cost=0.003, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=None), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.0091, costs_mixed, 4500)] + + result = calculate_cache_hit_rate(workflows) + + # 1 out of 3 traces have cache data = 33.33% + assert result == pytest.approx(33.33, abs=0.01) + + def test_calculate_cache_hit_rate_no_traces(self): + """Test cache hit rate with no traces returns 0.""" + from analyze_cost import calculate_cache_hit_rate + + result = calculate_cache_hit_rate([]) + + assert result == 0.0 + + def test_calculate_cache_hit_rate_no_cache_data(self): + """Test cache hit rate when no traces have cache data.""" + from analyze_cost import ( + calculate_cache_hit_rate, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + ) + + costs_no_cache = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0, + total_cost=0.003, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=None), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.003, costs_no_cache, 1500)] + + result = calculate_cache_hit_rate(workflows) + + # 0 out of 1 traces have cache data = 0% + assert result == 0.0 + + def test_calculate_cache_savings_with_cache(self): + """Test calculating cost savings from cache usage.""" + from analyze_cost import ( + calculate_cache_savings, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + # Create pricing config + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, # $1.25 per 1M input tokens + output_tokens_per_1k=0.005, # $5.00 per 1M output tokens + cache_read_per_1k=0.0003125, # $0.3125 per 1M cache read tokens + ) + + # Trace with 1000 input tokens, 800 cached + # Without cache: 1000 * 0.00125 / 1000 = $0.00125 + # With cache: 800 * 0.0003125 / 1000 = $0.00025 + # Savings: $0.001 per trace + costs_with_cache = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.00025, + total_cost=0.004, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.004, costs_with_cache, 1500)] + + result = calculate_cache_savings(workflows, pricing) + + # Savings: (800 tokens * input_price) - (800 tokens * cache_price) + # = (800 * 0.00125 / 1000) - (800 * 0.0003125 / 1000) + # = 0.001 - 0.00025 = 0.00075 + assert result == pytest.approx(0.00075, abs=0.00001) + + def test_calculate_cache_savings_no_cache(self): + """Test that no cache usage results in zero savings.""" + from analyze_cost import ( + calculate_cache_savings, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + ) + + costs_no_cache = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.0, + total_cost=0.00375, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=None), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.00375, costs_no_cache, 1500)] + + result = calculate_cache_savings(workflows, pricing) + + # No cache usage = no savings + assert result == 0.0 + + def test_calculate_cache_savings_multiple_traces(self): + """Test calculating savings across multiple traces.""" + from analyze_cost import ( + calculate_cache_savings, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + cache_read_per_1k=0.0003125, + ) + + # Two traces with cache, one without + costs_mixed = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.00025, + total_cost=0.004, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + CostBreakdown( + trace_id="2", + trace_name="Validator", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0002, + total_cost=0.0032, + token_usage=TokenUsage(800, 400, 1200, cached_tokens=600), + ), + CostBreakdown( + trace_id="3", + trace_name="Parser", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0, + total_cost=0.003, + token_usage=TokenUsage(800, 400, 1200, cached_tokens=None), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.0102, costs_mixed, 3900)] + + result = calculate_cache_savings(workflows, pricing) + + # Trace 1: (800 * 0.00125 / 1000) - (800 * 0.0003125 / 1000) = 0.00075 + # Trace 2: (600 * 0.00125 / 1000) - (600 * 0.0003125 / 1000) = 0.0005625 + # Trace 3: 0 (no cache) + # Total: 0.00075 + 0.0005625 = 0.0013125 + assert result == pytest.approx(0.0013125, abs=0.00001) + + def test_calculate_cache_savings_no_cache_pricing(self): + """Test that missing cache pricing returns zero savings.""" + from analyze_cost import ( + calculate_cache_savings, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + # Pricing without cache_read_per_1k + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + ) + + costs_with_cache = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.0, + total_cost=0.00375, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.00375, costs_with_cache, 1500)] + + result = calculate_cache_savings(workflows, pricing) + + # No cache pricing configured = can't calculate savings + assert result == 0.0 + + def test_compare_cached_vs_fresh_costs_with_cache(self): + """Test comparing costs with cache vs without cache.""" + from analyze_cost import ( + compare_cached_vs_fresh_costs, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + cache_read_per_1k=0.0003125, + ) + + # Two traces: one with cache, one without + costs = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.00025, + total_cost=0.004, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + CostBreakdown( + trace_id="2", + trace_name="Validator", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.0, + total_cost=0.003, + token_usage=TokenUsage(800, 400, 1200, cached_tokens=None), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.007, costs, 2700)] + + result = compare_cached_vs_fresh_costs(workflows, pricing) + + # Cost with cache: 0.004 + 0.003 = 0.007 + assert result.cost_with_cache == pytest.approx(0.007, abs=0.0001) + + # Cost without cache: trace 1 would be 0.00125 + 0.0025 + (800 * 0.00125 / 1000) + # = 0.00125 + 0.0025 + 0.001 = 0.00475 + # trace 2 stays same: 0.003 + # Total: 0.00775 + assert result.cost_without_cache == pytest.approx(0.00775, abs=0.0001) + + # Savings: 0.00775 - 0.007 = 0.00075 + assert result.total_savings == pytest.approx(0.00075, abs=0.00001) + + # Savings percent: (0.00075 / 0.00775) * 100 = 9.68% + assert result.savings_percent == pytest.approx(9.68, abs=0.01) + + assert result.traces_analyzed == 2 + assert result.traces_with_cache == 1 + + def test_compare_cached_vs_fresh_costs_no_cache(self): + """Test comparison when no traces use cache.""" + from analyze_cost import ( + compare_cached_vs_fresh_costs, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + ) + + costs = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.0, + total_cost=0.00375, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=None), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.00375, costs, 1500)] + + result = compare_cached_vs_fresh_costs(workflows, pricing) + + # No cache usage, so costs are identical + assert result.cost_with_cache == pytest.approx(0.00375, abs=0.0001) + assert result.cost_without_cache == pytest.approx(0.00375, abs=0.0001) + assert result.total_savings == 0.0 + assert result.savings_percent == 0.0 + assert result.traces_analyzed == 1 + assert result.traces_with_cache == 0 + + def test_compare_cached_vs_fresh_costs_all_cached(self): + """Test comparison when all traces use cache.""" + from analyze_cost import ( + compare_cached_vs_fresh_costs, + WorkflowCostAnalysis, + CostBreakdown, + TokenUsage, + PricingConfig, + ) + + pricing = PricingConfig( + model_name="Test Model", + input_tokens_per_1k=0.00125, + output_tokens_per_1k=0.005, + cache_read_per_1k=0.0003125, + ) + + costs = [ + CostBreakdown( + trace_id="1", + trace_name="ChatModel", + input_cost=0.00125, + output_cost=0.0025, + cache_cost=0.00025, + total_cost=0.004, + token_usage=TokenUsage(1000, 500, 1500, cached_tokens=800), + ), + CostBreakdown( + trace_id="2", + trace_name="Validator", + input_cost=0.001, + output_cost=0.002, + cache_cost=0.00015, + total_cost=0.00315, + token_usage=TokenUsage(800, 400, 1200, cached_tokens=480), + ), + ] + + workflows = [WorkflowCostAnalysis("wf1", 0.00715, costs, 2700)] + + result = compare_cached_vs_fresh_costs(workflows, pricing) + + # Both traces use cache + assert result.traces_analyzed == 2 + assert result.traces_with_cache == 2 + + # Savings should be positive + assert result.total_savings > 0 + assert result.savings_percent > 0 + assert result.cost_without_cache > result.cost_with_cache + + +# Run tests with: pytest test_analyze_cost.py -v diff --git a/test_analyze_failures.py b/test_analyze_failures.py new file mode 100644 index 0000000..0a5aca7 --- /dev/null +++ b/test_analyze_failures.py @@ -0,0 +1,426 @@ +""" +Test suite for Phase 3C: Failure Pattern Analysis + +Following TDD methodology - tests written FIRST, then implementation. +Tests for failure detection, retry analysis, and quality risk assessment. + +Author: Generated with Claude Code (PDCA Framework) +Date: 2025-12-09 +""" + +import pytest +from datetime import datetime, timezone +from analyze_failures import ( + detect_failures, + classify_error, + detect_retry_sequences, + calculate_retry_success_rate, + RetrySequence, +) +from analyze_traces import Trace, Workflow + + +class TestFailureDetection: + """Test failure detection from traces.""" + + def test_detect_failures_single_failure(self): + """Test detecting a single failure in workflow.""" + root = Trace( + id="root-1", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["child-1"], + inputs={}, + outputs={}, + error=None, + ) + + child = Trace( + id="child-1", + name="Validator", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="error", + run_type="chain", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={}, + error="Validation failed: invalid spec", + ) + + workflow = Workflow( + root_trace=root, + nodes={"Validator": [child]}, + all_traces=[root, child], + ) + + result = detect_failures(workflow) + + assert result is not None + assert len(result) == 1 + assert result[0].trace_id == "child-1" + assert result[0].trace_name == "Validator" + assert result[0].error_type == "validation_failure" + + def test_detect_failures_no_failures(self): + """Test workflow with no failures.""" + root = Trace( + id="root-2", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=[], + inputs={}, + outputs={}, + error=None, + ) + + workflow = Workflow( + root_trace=root, + nodes={}, + all_traces=[root], + ) + + result = detect_failures(workflow) + + assert result is not None + assert len(result) == 0 + + def test_classify_error_validation_failure(self): + """Test classifying validation error.""" + error_msg = "Validation failed: invalid specification" + result = classify_error(error_msg) + assert result == "validation_failure" + + def test_classify_error_api_timeout(self): + """Test classifying timeout error.""" + error_msg = "Request timed out after 30 seconds" + result = classify_error(error_msg) + assert result == "api_timeout" + + def test_classify_error_import_error(self): + """Test classifying import error.""" + error_msg = "Import failed: module not found" + result = classify_error(error_msg) + assert result == "import_error" + + def test_classify_error_llm_error(self): + """Test classifying LLM error.""" + error_msg = "Model generation failed: token limit exceeded" + result = classify_error(error_msg) + assert result == "llm_error" + + def test_classify_error_unknown(self): + """Test classifying unknown error.""" + error_msg = "Something went wrong" + result = classify_error(error_msg) + assert result == "unknown" + + def test_classify_error_none(self): + """Test classifying None error.""" + result = classify_error(None) + assert result == "unknown" + + +class TestRetryDetection: + """Test retry sequence detection.""" + + def test_detect_retry_sequences_two_attempts(self): + """Test detecting retry sequence with 2 attempts.""" + root = Trace( + id="root-1", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["attempt-1", "attempt-2"], + inputs={}, + outputs={}, + error=None, + ) + + attempt1 = Trace( + id="attempt-1", + name="ChatModel", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 1, 30, tzinfo=timezone.utc), + duration_seconds=30.0, + status="error", + run_type="llm", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={}, + error="Timeout", + ) + + attempt2 = Trace( + id="attempt-2", + name="ChatModel", + start_time=datetime(2025, 1, 1, 12, 1, 35, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 5, tzinfo=timezone.utc), + duration_seconds=30.0, + status="success", + run_type="llm", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={}, + error=None, + ) + + workflow = Workflow( + root_trace=root, + nodes={"ChatModel": [attempt1, attempt2]}, + all_traces=[root, attempt1, attempt2], + ) + + result = detect_retry_sequences(workflow) + + assert result is not None + assert len(result) == 1 + assert result[0].node_name == "ChatModel" + assert result[0].attempt_count == 2 + assert result[0].final_status == "success" + assert result[0].total_duration_seconds == 60.0 + + def test_detect_retry_sequences_no_retries(self): + """Test workflow with no retry sequences.""" + root = Trace( + id="root-2", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["child-1"], + inputs={}, + outputs={}, + error=None, + ) + + child = Trace( + id="child-1", + name="ChatModel", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id="root-2", + child_ids=[], + inputs={}, + outputs={}, + error=None, + ) + + workflow = Workflow( + root_trace=root, + nodes={"ChatModel": [child]}, + all_traces=[root, child], + ) + + result = detect_retry_sequences(workflow) + + assert result is not None + assert len(result) == 0 + + def test_calculate_retry_success_rate_all_succeed(self): + """Test retry success rate when all retries succeed.""" + retry1 = RetrySequence( + node_name="ChatModel", + workflow_id="wf-1", + attempt_count=2, + attempts=[], + final_status="success", + total_duration_seconds=60.0, + ) + + retry2 = RetrySequence( + node_name="Validator", + workflow_id="wf-2", + attempt_count=3, + attempts=[], + final_status="success", + total_duration_seconds=90.0, + ) + + result = calculate_retry_success_rate([retry1, retry2]) + + assert result is not None + assert result == pytest.approx(100.0, abs=0.01) + + def test_calculate_retry_success_rate_partial_success(self): + """Test retry success rate with partial success.""" + retry1 = RetrySequence( + node_name="ChatModel", + workflow_id="wf-1", + attempt_count=2, + attempts=[], + final_status="success", + total_duration_seconds=60.0, + ) + + retry2 = RetrySequence( + node_name="Validator", + workflow_id="wf-2", + attempt_count=3, + attempts=[], + final_status="error", + total_duration_seconds=90.0, + ) + + result = calculate_retry_success_rate([retry1, retry2]) + + assert result is not None + assert result == pytest.approx(50.0, abs=0.01) + + def test_calculate_retry_success_rate_empty_list(self): + """Test retry success rate with empty list.""" + result = calculate_retry_success_rate([]) + assert result is None + + +class TestNodeFailureAnalysis: + """Test node-level failure analysis.""" + + def test_analyze_node_failures_basic(self): + """Test basic node failure analysis.""" + from analyze_failures import analyze_node_failures + + # Create workflows with different failure patterns + root1 = Trace( + id="root-1", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["child-1", "child-2"], + inputs={}, + outputs={}, + error=None, + ) + + # Validator that fails + child1 = Trace( + id="child-1", + name="Validator", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="error", + run_type="chain", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={}, + error="Validation failed", + ) + + # ChatModel that succeeds + child2 = Trace( + id="child-2", + name="ChatModel", + start_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 3, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="success", + run_type="llm", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={}, + error=None, + ) + + workflow1 = Workflow( + root_trace=root1, + nodes={"Validator": [child1], "ChatModel": [child2]}, + all_traces=[root1, child1, child2], + ) + + result = analyze_node_failures([workflow1]) + + assert result is not None + assert len(result) >= 1 + + # Find Validator stats + validator_stats = next((s for s in result if s.node_name == "Validator"), None) + assert validator_stats is not None + assert validator_stats.total_executions == 1 + assert validator_stats.failure_count == 1 + assert validator_stats.failure_rate_percent == pytest.approx(100.0, abs=0.01) + + +class TestMainAnalysisFunction: + """Test main analyze_failures() orchestration function.""" + + def test_analyze_failures_integration(self): + """Test complete failure analysis workflow.""" + from analyze_failures import analyze_failures + + # Create workflow with mixed success/failure + root = Trace( + id="root-1", + name="LangGraph", + start_time=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc), + duration_seconds=300.0, + status="success", + run_type="chain", + parent_id=None, + child_ids=["child-1"], + inputs={}, + outputs={}, + error=None, + ) + + child = Trace( + id="child-1", + name="Validator", + start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), + end_time=datetime(2025, 1, 1, 12, 2, 0, tzinfo=timezone.utc), + duration_seconds=60.0, + status="error", + run_type="chain", + parent_id="root-1", + child_ids=[], + inputs={}, + outputs={}, + error="Validation failed", + ) + + workflow = Workflow( + root_trace=root, + nodes={"Validator": [child]}, + all_traces=[root, child], + ) + + result = analyze_failures([workflow]) + + assert result is not None + assert result.total_workflows == 1 + assert len(result.node_failure_stats) >= 1 + assert result.error_type_distribution is not None + + +# Run tests with: pytest test_analyze_failures.py -v diff --git a/test_analyze_traces.py b/test_analyze_traces.py index da26493..e233021 100644 --- a/test_analyze_traces.py +++ b/test_analyze_traces.py @@ -101,7 +101,7 @@ def test_workflow_creation(self): child = Trace( id="child-1", - name="generate_spec", + name="process_data", start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), end_time=datetime(2025, 1, 1, 12, 3, 0, tzinfo=timezone.utc), duration_seconds=120.0, @@ -116,12 +116,12 @@ def test_workflow_creation(self): # Act workflow = Workflow( - root_trace=root, nodes={"generate_spec": [child]}, all_traces=[root, child] + root_trace=root, nodes={"process_data": [child]}, all_traces=[root, child] ) # Assert assert workflow.root_trace.id == "root-1" - assert "generate_spec" in workflow.nodes + assert "process_data" in workflow.nodes assert len(workflow.all_traces) == 2 def test_workflow_total_duration_property(self): @@ -251,7 +251,7 @@ def test_load_from_json_with_hierarchical_data(self): "child_runs": [ { "id": "child-1", - "name": "generate_spec", + "name": "process_data", "start_time": "2025-01-01T12:01:00+00:00", "end_time": "2025-01-01T12:03:00+00:00", "duration_seconds": 120.0, @@ -541,7 +541,7 @@ def test_node_performance_dataclass_creation(self): # Arrange & Act node_perf = NodePerformance( - node_name="generate_spec", + node_name="process_data", execution_count=100, avg_duration_seconds=180.5, median_duration_seconds=175.2, @@ -551,7 +551,7 @@ def test_node_performance_dataclass_creation(self): ) # Assert - assert node_perf.node_name == "generate_spec" + assert node_perf.node_name == "process_data" assert node_perf.execution_count == 100 assert node_perf.avg_duration_seconds == 180.5 assert node_perf.avg_percent_of_workflow == 15.2 @@ -562,7 +562,7 @@ def test_bottleneck_analysis_dataclass_creation(self): # Arrange node1 = NodePerformance( - node_name="xml_transformation", + node_name="transform_output", execution_count=100, avg_duration_seconds=250.8, median_duration_seconds=245.0, @@ -572,7 +572,7 @@ def test_bottleneck_analysis_dataclass_creation(self): ) node2 = NodePerformance( - node_name="generate_spec", + node_name="process_data", execution_count=100, avg_duration_seconds=180.5, median_duration_seconds=175.2, @@ -584,14 +584,14 @@ def test_bottleneck_analysis_dataclass_creation(self): # Act analysis = BottleneckAnalysis( node_performances=[node1, node2], - primary_bottleneck="xml_transformation", - top_3_bottlenecks=["xml_transformation", "generate_spec"], + primary_bottleneck="transform_output", + top_3_bottlenecks=["transform_output", "process_data"], ) # Assert assert len(analysis.node_performances) == 2 - assert analysis.primary_bottleneck == "xml_transformation" - assert analysis.top_3_bottlenecks[0] == "xml_transformation" + assert analysis.primary_bottleneck == "transform_output" + assert analysis.top_3_bottlenecks[0] == "transform_output" def test_identify_bottlenecks_basic(self): """Test basic bottleneck identification with simple workflow.""" @@ -615,7 +615,7 @@ def test_identify_bottlenecks_basic(self): node1 = Trace( id="node-1", - name="generate_spec", + name="process_data", start_time=None, end_time=None, duration_seconds=200.0, # 33% of workflow @@ -630,7 +630,7 @@ def test_identify_bottlenecks_basic(self): node2 = Trace( id="node-2", - name="xml_transformation", + name="transform_output", start_time=None, end_time=None, duration_seconds=300.0, # 50% of workflow (bottleneck) @@ -661,8 +661,8 @@ def test_identify_bottlenecks_basic(self): workflow = Workflow( root_trace=root, nodes={ - "generate_spec": [node1], - "xml_transformation": [node2], + "process_data": [node1], + "transform_output": [node2], "import_step": [node3], }, all_traces=[root, node1, node2, node3], @@ -673,8 +673,8 @@ def test_identify_bottlenecks_basic(self): # Assert assert len(result.node_performances) == 3 - assert result.primary_bottleneck == "xml_transformation" - assert result.node_performances[0].node_name == "xml_transformation" + assert result.primary_bottleneck == "transform_output" + assert result.node_performances[0].node_name == "transform_output" assert result.node_performances[0].avg_duration_seconds == 300.0 assert result.node_performances[0].execution_count == 1 @@ -1105,7 +1105,7 @@ def test_verify_parallel_execution_workflows_without_validators(self): node1 = Trace( id="node-1", - name="generate_spec", + name="process_data", start_time=datetime(2025, 1, 1, 12, 1, 0, tzinfo=timezone.utc), end_time=datetime(2025, 1, 1, 12, 3, 0, tzinfo=timezone.utc), duration_seconds=120.0, @@ -1119,7 +1119,7 @@ def test_verify_parallel_execution_workflows_without_validators(self): ) workflow = Workflow( - root_trace=root, nodes={"generate_spec": [node1]}, all_traces=[root, node1] + root_trace=root, nodes={"process_data": [node1]}, all_traces=[root, node1] ) # Act @@ -1169,7 +1169,7 @@ def test_bottleneck_analysis_to_csv(self): # Arrange node1 = NodePerformance( - node_name="xml_transformation", + node_name="transform_output", execution_count=100, avg_duration_seconds=250.8, median_duration_seconds=245.0, @@ -1179,7 +1179,7 @@ def test_bottleneck_analysis_to_csv(self): ) node2 = NodePerformance( - node_name="generate_spec", + node_name="process_data", execution_count=100, avg_duration_seconds=180.5, median_duration_seconds=175.2, @@ -1190,8 +1190,8 @@ def test_bottleneck_analysis_to_csv(self): analysis = BottleneckAnalysis( node_performances=[node1, node2], - primary_bottleneck="xml_transformation", - top_3_bottlenecks=["xml_transformation", "generate_spec"], + primary_bottleneck="transform_output", + top_3_bottlenecks=["transform_output", "process_data"], ) # Act @@ -1202,8 +1202,8 @@ def test_bottleneck_analysis_to_csv(self): "node_name,execution_count,avg_duration_seconds,median_duration_seconds" in csv_output ) - assert "xml_transformation,100,250.8,245.0" in csv_output - assert "generate_spec,100,180.5,175.2" in csv_output + assert "transform_output,100,250.8,245.0" in csv_output + assert "process_data,100,180.5,175.2" in csv_output def test_parallel_execution_evidence_to_csv(self): """Test exporting ParallelExecutionEvidence to CSV format.""" diff --git a/test_export_langsmith_traces.py b/test_export_langsmith_traces.py index 8f461e5..d379a5c 100644 --- a/test_export_langsmith_traces.py +++ b/test_export_langsmith_traces.py @@ -430,6 +430,231 @@ def test_format_trace_data_complete_fields(self, mock_client_class): assert "duration_seconds" in trace assert trace["child_runs"] == [] + @patch("export_langsmith_traces.Client") + def test_format_trace_data_includes_token_usage(self, mock_client_class): + """Test that token usage fields are exported when present.""" + # Arrange + from datetime import datetime, timezone + + mock_client = Mock() + mock_client_class.return_value = mock_client + + # Create mock LLM Run with token usage + mock_run = Mock() + mock_run.id = "llm_run_456" + mock_run.name = "ChatGoogleGenerativeAI" + mock_run.start_time = datetime(2025, 12, 9, 10, 0, 0, tzinfo=timezone.utc) + mock_run.end_time = datetime(2025, 12, 9, 10, 2, 0, tzinfo=timezone.utc) + mock_run.status = "success" + mock_run.inputs = {"messages": []} + mock_run.outputs = {"generations": []} + mock_run.error = None + mock_run.run_type = "llm" + mock_run.child_runs = [] + # Token usage fields (as found in LangSmith API) + mock_run.total_tokens = 162286 + mock_run.prompt_tokens = 128227 + mock_run.completion_tokens = 34059 + + exporter = LangSmithExporter(api_key="test_key") + + # Act + result = exporter.format_trace_data([mock_run]) + + # Assert + trace = result["traces"][0] + assert "total_tokens" in trace + assert "prompt_tokens" in trace + assert "completion_tokens" in trace + assert trace["total_tokens"] == 162286 + assert trace["prompt_tokens"] == 128227 + assert trace["completion_tokens"] == 34059 + + @patch("export_langsmith_traces.Client") + def test_format_trace_data_handles_missing_tokens(self, mock_client_class): + """Test that missing token fields are handled gracefully.""" + # Arrange + from datetime import datetime, timezone + + mock_client = Mock() + mock_client_class.return_value = mock_client + + # Create mock non-LLM Run without token usage + mock_run = Mock() + mock_run.id = "chain_run_789" + mock_run.name = "LangGraph" + mock_run.start_time = datetime(2025, 12, 9, 10, 0, 0, tzinfo=timezone.utc) + mock_run.end_time = datetime(2025, 12, 9, 10, 5, 0, tzinfo=timezone.utc) + mock_run.status = "success" + mock_run.inputs = {} + mock_run.outputs = {} + mock_run.error = None + mock_run.run_type = "chain" + mock_run.child_runs = [] + # No token fields (non-LLM run) + mock_run.total_tokens = None + mock_run.prompt_tokens = None + mock_run.completion_tokens = None + + exporter = LangSmithExporter(api_key="test_key") + + # Act + result = exporter.format_trace_data([mock_run]) + + # Assert + trace = result["traces"][0] + # Token fields should be present but None for non-LLM runs + assert "total_tokens" in trace + assert "prompt_tokens" in trace + assert "completion_tokens" in trace + assert trace["total_tokens"] is None + assert trace["prompt_tokens"] is None + assert trace["completion_tokens"] is None + + @patch("export_langsmith_traces.Client") + def test_format_trace_data_includes_cache_tokens_from_nested_fields( + self, mock_client_class + ): + """Test that cache token fields are extracted from nested input_token_details (LangSmith API structure).""" + # Arrange + from datetime import datetime, timezone + + mock_client = Mock() + mock_client_class.return_value = mock_client + + # Create mock LLM Run with cache token usage in nested input_token_details + # Use spec to prevent Mock from creating cache_read_tokens/cache_creation_tokens automatically + mock_run = Mock( + spec=[ + "id", + "name", + "start_time", + "end_time", + "status", + "inputs", + "outputs", + "error", + "run_type", + "child_runs", + "total_tokens", + "prompt_tokens", + "completion_tokens", + ] + ) + mock_run.id = "cached_llm_run_999" + mock_run.name = "ChatGoogleGenerativeAI" + mock_run.start_time = datetime(2025, 12, 11, 10, 0, 0, tzinfo=timezone.utc) + mock_run.end_time = datetime(2025, 12, 11, 10, 2, 0, tzinfo=timezone.utc) + mock_run.status = "success" + mock_run.inputs = {"messages": []} + mock_run.error = None + mock_run.run_type = "llm" + mock_run.child_runs = [] + # Standard token fields + mock_run.total_tokens = 50000 + mock_run.prompt_tokens = 10000 + mock_run.completion_tokens = 5000 + # Cache token fields in nested LangChain message structure (actual LangSmith export format) + # These are nested under outputs["generations"][0][0]["message"]["kwargs"]["usage_metadata"]["input_token_details"] + mock_run.outputs = { + "generations": [ + [ + { + "message": { + "kwargs": { + "usage_metadata": { + "input_tokens": 10000, + "output_tokens": 5000, + "total_tokens": 50000, + "input_token_details": { + "cache_read": 35000, # Tokens read from cache + "cache_creation": 0, # Tokens written to cache + }, + } + } + } + } + ] + ] + } + # Top-level cache fields not present (not in spec, so getattr will return None) + + exporter = LangSmithExporter(api_key="test_key") + + # Act + result = exporter.format_trace_data([mock_run]) + + # Assert + trace = result["traces"][0] + assert "cache_read_tokens" in trace + assert "cache_creation_tokens" in trace + assert trace["cache_read_tokens"] == 35000 + assert trace["cache_creation_tokens"] == 0 + # Standard tokens should still be present + assert trace["total_tokens"] == 50000 + assert trace["prompt_tokens"] == 10000 + assert trace["completion_tokens"] == 5000 + + @patch("export_langsmith_traces.Client") + def test_format_trace_data_handles_missing_cache_tokens(self, mock_client_class): + """Test that missing cache token fields are handled gracefully (older LangSmith data).""" + # Arrange + from datetime import datetime, timezone + + mock_client = Mock() + mock_client_class.return_value = mock_client + + # Create mock LLM Run WITHOUT cache token fields (older export or non-cached run) + mock_run = Mock( + spec=[ + "id", + "name", + "start_time", + "end_time", + "status", + "inputs", + "outputs", + "error", + "run_type", + "child_runs", + "total_tokens", + "prompt_tokens", + "completion_tokens", + ] + ) + mock_run.id = "old_llm_run_888" + mock_run.name = "ChatGoogleGenerativeAI" + mock_run.start_time = datetime(2025, 12, 11, 10, 0, 0, tzinfo=timezone.utc) + mock_run.end_time = datetime(2025, 12, 11, 10, 2, 0, tzinfo=timezone.utc) + mock_run.status = "success" + mock_run.inputs = {"messages": []} + mock_run.outputs = {"generations": []} + mock_run.error = None + mock_run.run_type = "llm" + mock_run.child_runs = [] + # Standard token fields present + mock_run.total_tokens = 15000 + mock_run.prompt_tokens = 10000 + mock_run.completion_tokens = 5000 + # Cache token fields NOT present (not in spec, so getattr will return None via default) + + exporter = LangSmithExporter(api_key="test_key") + + # Act + result = exporter.format_trace_data([mock_run]) + + # Assert + trace = result["traces"][0] + # Cache token fields should be present but None when not available in source data + assert "cache_read_tokens" in trace + assert "cache_creation_tokens" in trace + assert trace["cache_read_tokens"] is None + assert trace["cache_creation_tokens"] is None + # Standard tokens should still be present + assert trace["total_tokens"] == 15000 + assert trace["prompt_tokens"] == 10000 + assert trace["completion_tokens"] == 5000 + @patch("export_langsmith_traces.Client") def test_format_trace_data_missing_fields(self, mock_client_class): """Test safe handling of missing/null fields.""" @@ -1005,13 +1230,13 @@ def test_fetch_runs_with_children_single_run(self, mock_client_class): # Mock the full run with children from read_run child1 = Mock() child1.id = "child-1" - child1.name = "generate_spec" + child1.name = "process_data" child1.run_type = "chain" child1.child_runs = [] child2 = Mock() child2.id = "child-2" - child2.name = "xml_transformation" + child2.name = "transform_output" child2.run_type = "chain" child2.child_runs = [] @@ -1033,8 +1258,8 @@ def test_fetch_runs_with_children_single_run(self, mock_client_class): assert runs[0].id == "parent-123" assert runs[0].child_runs is not None assert len(runs[0].child_runs) == 2 - assert runs[0].child_runs[0].name == "generate_spec" - assert runs[0].child_runs[1].name == "xml_transformation" + assert runs[0].child_runs[0].name == "process_data" + assert runs[0].child_runs[1].name == "transform_output" # Verify read_run was called with load_child_runs=True mock_client.read_run.assert_called_once_with("parent-123", load_child_runs=True) @@ -1147,9 +1372,36 @@ def test_main_success_workflow(self, mock_argv, mock_client_class): mock_client_class.return_value = mock_client # Create mock runs with only essential attributes - mock_run = Mock(spec=["id", "name"]) + mock_run = Mock( + spec=[ + "id", + "name", + "start_time", + "end_time", + "status", + "inputs", + "outputs", + "error", + "run_type", + "child_runs", + "total_tokens", + "prompt_tokens", + "completion_tokens", + ] + ) mock_run.id = "run_123" mock_run.name = "test_run" + mock_run.start_time = None + mock_run.end_time = None + mock_run.status = "completed" + mock_run.inputs = {} + mock_run.outputs = {} + mock_run.error = None + mock_run.run_type = "chain" + mock_run.child_runs = [] + mock_run.total_tokens = None + mock_run.prompt_tokens = None + mock_run.completion_tokens = None mock_client.list_runs.return_value = [mock_run] @@ -1252,6 +1504,13 @@ def test_main_with_pagination_success( run.error = None run.run_type = "chain" run.child_runs = [] + # Token fields (None for non-LLM runs) + run.total_tokens = None + run.prompt_tokens = None + run.completion_tokens = None + # Cache token fields (None for non-cached runs) + run.cache_read_tokens = None + run.cache_creation_tokens = None all_runs.append(run) def mock_list_runs(*args, **kwargs): @@ -1338,6 +1597,9 @@ def test_main_with_include_children_flag( "error", "run_type", "child_runs", + "total_tokens", + "prompt_tokens", + "completion_tokens", ] ) full_run1.id = "run_1" @@ -1350,6 +1612,9 @@ def test_main_with_include_children_flag( full_run1.error = None full_run1.run_type = "chain" full_run1.child_runs = [] # Empty list to avoid nested Mock serialization + full_run1.total_tokens = None + full_run1.prompt_tokens = None + full_run1.completion_tokens = None full_run2 = Mock( spec=[ @@ -1363,6 +1628,9 @@ def test_main_with_include_children_flag( "error", "run_type", "child_runs", + "total_tokens", + "prompt_tokens", + "completion_tokens", ] ) full_run2.id = "run_2" @@ -1375,6 +1643,9 @@ def test_main_with_include_children_flag( full_run2.error = None full_run2.run_type = "chain" full_run2.child_runs = [] + full_run2.total_tokens = None + full_run2.prompt_tokens = None + full_run2.completion_tokens = None # Mock behaviors mock_client.list_runs.return_value = iter([flat_run1, flat_run2]) diff --git a/verify_analysis_report.py b/verify_analysis_report.py index 0d420dd..c70c280 100644 --- a/verify_analysis_report.py +++ b/verify_analysis_report.py @@ -32,6 +32,16 @@ BottleneckAnalysis, ParallelExecutionEvidence, ) +from analyze_cost import ( + analyze_costs, + PricingConfig, + EXAMPLE_PRICING_CONFIGS, + CostAnalysisResults, +) +from analyze_failures import ( + analyze_failures, + FailureAnalysisResults, +) def print_header(title: str) -> None: @@ -265,11 +275,11 @@ def verify_parallel_execution( def generate_summary_report( - dataset: TraceDataset, - latency_dist: LatencyDistribution, - bottleneck_analysis: BottleneckAnalysis, - parallel_evidence: ParallelExecutionEvidence, - ) -> None: + dataset: TraceDataset, + latency_dist: LatencyDistribution, + bottleneck_analysis: BottleneckAnalysis, + parallel_evidence: ParallelExecutionEvidence, +) -> None: """Generate final summary with all key findings.""" print_header("ANALYSIS SUMMARY") @@ -305,6 +315,156 @@ def generate_summary_report( ) +def verify_cost_analysis( + dataset: TraceDataset, + pricing_config: PricingConfig, + expected: Optional[Dict[str, Any]] = None, +) -> CostAnalysisResults: + """ + Verify Phase 3B cost calculations. + + Displays: + - Cost per workflow (avg, median, range) + - Top 3 cost drivers + - Scaling projections (10x, 100x, 1000x) + - Cache effectiveness if available + """ + print_header("PHASE 3B: COST ANALYSIS VERIFICATION") + + print(f"\nPricing Model: {pricing_config.model_name}") + print(f" Input tokens: ${pricing_config.input_tokens_per_1k}/1K tokens") + print(f" Output tokens: ${pricing_config.output_tokens_per_1k}/1K tokens") + if pricing_config.cache_read_per_1k: + print(f" Cache read tokens: ${pricing_config.cache_read_per_1k}/1K tokens") + + # Run cost analysis + results = analyze_costs(dataset.workflows, pricing_config) + + print_section("Workflow Cost Statistics") + print(f" Total workflows analyzed: {results.total_workflows_analyzed}") + print(f" Average cost per workflow: ${results.avg_cost_per_workflow:.4f}") + print(f" Median cost per workflow: ${results.median_cost_per_workflow:.4f}") + print(f" Cost range: ${results.min_cost:.4f} - ${results.max_cost:.4f}") + + if expected and "cost_analysis" in expected: + exp_cost = expected["cost_analysis"] + check_value( + "avg_cost_per_workflow", + results.avg_cost_per_workflow, + exp_cost.get("avg_cost_per_workflow"), + ) + + print_section("Cost Drivers (Top 3 Nodes)") + for i, node in enumerate(results.node_summaries[:3], 1): + print( + f" {i}. {node.node_name}: ${node.total_cost:.4f} ({node.percent_of_total_cost:.1f}%)" + ) + print( + f" Executions: {node.execution_count}, Avg: ${node.avg_cost_per_execution:.4f}" + ) + + print_section("Scaling Projections") + for scale_label in ["1x", "10x", "100x", "1000x"]: + if scale_label in results.scaling_projections: + proj = results.scaling_projections[scale_label] + print( + f" {scale_label}: {proj.workflow_count} workflows → ${proj.total_cost:.2f}" + ) + if proj.cost_per_month_30days: + print(f" Monthly (30 days): ${proj.cost_per_month_30days:.2f}") + + if results.cache_effectiveness_percent: + print_section("Cache Effectiveness") + print(f" Cache hit rate: {results.cache_effectiveness_percent:.1f}%") + if results.cache_savings_dollars: + print(f" Cost savings: ${results.cache_savings_dollars:.2f}") + + if results.data_quality_notes: + print_section("Data Quality Notes") + for note in results.data_quality_notes: + print(f" - {note}") + + return results + + +def verify_failure_analysis( + dataset: TraceDataset, expected: Optional[Dict[str, Any]] = None +) -> FailureAnalysisResults: + """ + Verify Phase 3C failure calculations. + + Displays: + - Overall success rate + - Top 5 nodes by failure rate + - Retry analysis (sequences detected, success rate) + - Validator effectiveness + """ + print_header("PHASE 3C: FAILURE PATTERN ANALYSIS VERIFICATION") + + # Run failure analysis + results = analyze_failures(dataset.workflows) + + print_section("Overall Success/Failure Metrics") + print(f" Total workflows: {results.total_workflows}") + print(f" Successful: {results.successful_workflows}") + print(f" Failed: {results.failed_workflows}") + print(f" Success rate: {results.overall_success_rate_percent:.1f}%") + + if expected and "failure_analysis" in expected: + exp_fail = expected["failure_analysis"] + check_value( + "success_rate", + results.overall_success_rate_percent, + exp_fail.get("success_rate"), + ) + + print_section("Node Failure Rates (Top 5)") + for i, node in enumerate(results.node_failure_stats[:5], 1): + print(f" {i}. {node.node_name}: {node.failure_rate_percent:.1f}% failure rate") + print(f" {node.failure_count}/{node.total_executions} executions failed") + if node.retry_sequences_detected > 0: + print(f" Retry sequences detected: {node.retry_sequences_detected}") + + print_section("Error Distribution") + if results.error_type_distribution: + sorted_errors = sorted( + results.error_type_distribution.items(), + key=lambda x: x[1], + reverse=True, + ) + for error_type, count in sorted_errors[:5]: + print(f" - {error_type}: {count} occurrences") + else: + print(" No errors detected") + + print_section("Retry Analysis") + print(f" Total retry sequences: {results.total_retry_sequences}") + if results.retry_success_rate_percent is not None: + print(f" Retry success rate: {results.retry_success_rate_percent:.1f}%") + if results.avg_cost_of_retries is not None: + print(f" Avg cost of retries: ${results.avg_cost_of_retries:.4f}") + + if results.validator_analyses: + print_section("Validator Effectiveness") + for validator in results.validator_analyses: + print(f" {validator.validator_name}:") + print(f" Executions: {validator.total_executions}") + print(f" Pass rate: {validator.pass_rate_percent:.1f}%") + print(f" Necessary: {'Yes' if validator.is_necessary else 'No'}") + + if results.redundant_validators: + print( + f"\n Potentially redundant validators: {', '.join(results.redundant_validators)}" + ) + + if results.quality_risks_at_scale: + print_section("Quality Risks at Scale") + for risk in results.quality_risks_at_scale: + print(f" - {risk}") + + return results + + def main() -> int: """Main verification script.""" parser = argparse.ArgumentParser( @@ -318,6 +478,18 @@ def main() -> int: type=str, help="Optional JSON file with expected values for verification", ) + parser.add_argument( + "--phases", + type=str, + default="3a", + help="Phases to verify: 3a, 3b, 3c, or all (default: 3a)", + ) + parser.add_argument( + "--pricing-model", + type=str, + default="gemini_1.5_pro", + help="Pricing model for cost analysis (default: gemini_1.5_pro)", + ) args = parser.parse_args() @@ -341,14 +513,36 @@ def main() -> int: print(f"\nLoading data from: {input_path}") dataset = load_from_json(str(input_path)) - # Run all verifications - verify_dataset_info(dataset, expected) - latency_dist = verify_latency_distribution(dataset, expected) - bottleneck_analysis = verify_bottleneck_analysis(dataset, expected) - parallel_evidence = verify_parallel_execution(dataset, expected) - generate_summary_report( - dataset, latency_dist, bottleneck_analysis, parallel_evidence - ) + phases = args.phases.lower() + run_3a = phases in ["3a", "all"] + run_3b = phases in ["3b", "all"] + run_3c = phases in ["3c", "all"] + + # Run Phase 3A verifications + if run_3a: + verify_dataset_info(dataset, expected) + latency_dist = verify_latency_distribution(dataset, expected) + bottleneck_analysis = verify_bottleneck_analysis(dataset, expected) + parallel_evidence = verify_parallel_execution(dataset, expected) + generate_summary_report( + dataset, latency_dist, bottleneck_analysis, parallel_evidence + ) + + # Run Phase 3B verification (Cost Analysis) + if run_3b: + if args.pricing_model in EXAMPLE_PRICING_CONFIGS: + pricing_config = EXAMPLE_PRICING_CONFIGS[args.pricing_model] + else: + print( + f"\nWarning: Unknown pricing model '{args.pricing_model}', using gemini_1.5_pro" + ) + pricing_config = EXAMPLE_PRICING_CONFIGS["gemini_1.5_pro"] + + verify_cost_analysis(dataset, pricing_config, expected) + + # Run Phase 3C verification (Failure Analysis) + if run_3c: + verify_failure_analysis(dataset, expected) # Final status print_header("VERIFICATION COMPLETE")