diff --git a/docs/SCHEMA.md b/docs/SCHEMA.md index a155565..7513ee9 100644 --- a/docs/SCHEMA.md +++ b/docs/SCHEMA.md @@ -259,6 +259,9 @@ Sync triggers maintain index consistency: | 7 | add_tool_id_index | Performance index for self-joins | | 8 | add_unified_message_text | Unified message_text column, rebuilt FTS on all entry types (Issue #68) | | 9 | add_result_size_bytes | result_size_bytes column for context efficiency tracking (Issue #69) | +| 10 | backfill_compaction_and_result_size | Backfill compaction detection and result_size_bytes for existing data | +| 11 | fix_compaction_detection_user_entries | Fix compaction detection to look at user entries (not just summary) | +| 12 | fix_warmup_not_errors | Fix warmup events incorrectly marked as errors (Issue #75) | --- diff --git a/src/session_analytics/cli.py b/src/session_analytics/cli.py index 91e8d4d..c0ce7cb 100644 --- a/src/session_analytics/cli.py +++ b/src/session_analytics/cli.py @@ -9,6 +9,7 @@ from session_analytics.ingest import ( correlate_git_with_sessions, ingest_git_history, + ingest_git_history_all_projects, ingest_logs, ) from session_analytics.patterns import ( @@ -1063,6 +1064,16 @@ def cmd_git_correlate(args): print(format_output(result, args.json)) +def cmd_git_ingest_all(args): + """Ingest git history from all known projects.""" + storage = SQLiteStorage() + result = ingest_git_history_all_projects( + storage, + days=args.days, + ) + print(format_output(result, args.json)) + + def cmd_signals(args): """Show raw session signals for LLM interpretation (RFC #26, revised per RFC #17).""" storage = SQLiteStorage() @@ -1346,7 +1357,8 @@ def cmd_benchmark(args): } # Skipped tools (require specific data or modify DB): - # - ingest_logs, ingest_git_history, correlate_git_with_sessions, ingest_bus_events + # - ingest_logs, ingest_git_history, ingest_git_history_all_projects + # - correlate_git_with_sessions, ingest_bus_events # - find_related_sessions (requires valid session_id) benchmarks = [] @@ -1575,6 +1587,11 @@ def main(): sub.add_argument("--days", type=int, default=7, help="Days to correlate (default: 7)") sub.set_defaults(func=cmd_git_correlate) + # git-ingest-all + sub = subparsers.add_parser("git-ingest-all", help="Ingest git history from all known projects") + sub.add_argument("--days", type=int, default=7, help="Days of history (default: 7)") + sub.set_defaults(func=cmd_git_ingest_all) + # signals (RFC #26, revised per RFC #17 - raw data, no interpretation) sub = subparsers.add_parser("signals", help="Show raw session signals for LLM interpretation") sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md index 84a3710..b2afd14 100644 --- a/src/session_analytics/guide.md +++ b/src/session_analytics/guide.md @@ -98,7 +98,8 @@ Each session includes `classification_factors` explaining WHY it was categorized | Tool | Purpose | |------|---------| -| `ingest_git_history(days?, repo_path?)` | Parse and store git commits | +| `ingest_git_history(days?, repo_path?)` | Parse and store git commits from current repo | +| `ingest_git_history_all_projects(days?)` | Parse commits from all known projects | | `correlate_git_with_sessions(days?)` | Link commits to sessions by timing | | `get_session_commits(session_id?)` | Get commits associated with a session | @@ -258,7 +259,13 @@ match commands `make`, `make-test`, etc. using fnmatch. Git correlation requires two steps: ``` -ingest_git_history(days=30) # Parse commits from repo +# Option 1: Ingest from all known projects (recommended) +ingest_git_history_all_projects(days=30) + +# Option 2: Ingest from current repo only +ingest_git_history(days=30) + +# Then correlate and query correlate_git_with_sessions() # Link to sessions by timing get_session_commits(session_id="abc") # View results ``` diff --git a/src/session_analytics/ingest.py b/src/session_analytics/ingest.py index 8c20bd0..909dbe6 100644 --- a/src/session_analytics/ingest.py +++ b/src/session_analytics/ingest.py @@ -20,6 +20,50 @@ USER_MESSAGE_MAX_LENGTH = 2000 +def decode_project_path(encoded: str) -> Path | None: + """Decode an encoded project path back to a filesystem path. + + Claude Code encodes project paths by replacing '/' with '-' in directory names. + e.g., "-Users-evansenter-Documents-projects-dotfiles" -> "/Users/evansenter/Documents/projects/dotfiles" + + Handles paths with hyphens in directory names by trying to find valid paths. + e.g., "-Users-foo-my-project" could be "/Users/foo/my-project" or "/Users/foo-my/project" + + Returns None if the decoded path doesn't exist or isn't a directory. + """ + if not encoded: + return None + + # Split on '-' and skip empty first part (from leading '-') + parts = encoded.split("-") + if parts and parts[0] == "": + parts = parts[1:] + + if not parts: + return None + + def find_path(remaining_parts: list[str], current_path: Path) -> Path | None: + """Recursively find valid path by trying different segment combinations.""" + if not remaining_parts: + return current_path if current_path.is_dir() else None + + # Try combining 1, 2, 3... segments with hyphens + for num_parts in range(1, len(remaining_parts) + 1): + segment = "-".join(remaining_parts[:num_parts]) + candidate = current_path / segment + + if candidate.exists(): + # This segment exists, try to continue with remaining parts + result = find_path(remaining_parts[num_parts:], candidate) + if result is not None: + return result + + return None + + # Start from root + return find_path(parts, Path("/")) + + def extract_text_from_content(content) -> str | None: """Extract text content from various message content formats. @@ -402,6 +446,9 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: is_error = tr.get("is_error", False) # Issue #68: Extract tool result content tool_result_text = extract_tool_result_content(tr) + # Issue #75: Warmup exits are not real errors + if is_error and tool_result_text == "Warmup": + is_error = False events.append( Event( id=None, @@ -424,6 +471,10 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: ) else: # User message with other content types + # Issue #69: Detect compaction markers in user messages + user_entry_type = "command" if command_name else "user" + if detect_compaction(message_text): + user_entry_type = "compaction" events.append( Event( id=None, @@ -431,7 +482,7 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: timestamp=timestamp, session_id=session_id, project_path=project_path, - entry_type="command" if command_name else "user", + entry_type=user_entry_type, skill_name=command_name, # Reuse skill_name for command tracking user_message_text=user_message_text, message_text=message_text, # Issue #68: unified message text @@ -446,6 +497,10 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: ) else: # Plain text user message + # Issue #69: Detect compaction markers in user messages + user_entry_type = "command" if command_name else "user" + if detect_compaction(message_text): + user_entry_type = "compaction" events.append( Event( id=None, @@ -453,7 +508,7 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: timestamp=timestamp, session_id=session_id, project_path=project_path, - entry_type="command" if command_name else "user", + entry_type=user_entry_type, skill_name=command_name, # Reuse skill_name for command tracking user_message_text=user_message_text, message_text=message_text, # Issue #68: unified message text @@ -787,6 +842,99 @@ def ingest_git_history( } +def ingest_git_history_all_projects( + storage: SQLiteStorage, + days: int = 7, +) -> dict: + """Ingest git commit history from all known projects. + + Scans unique project paths from the events table, decodes them to filesystem + paths, and runs git ingestion on each that has a .git directory. + + Args: + storage: Storage instance + days: Number of days of history to ingest (default: 7) + + Returns: + Dict with aggregate stats: + - projects_found: Total unique project paths in events table + - projects_with_git: Projects that have a .git directory + - projects_ingested: Projects successfully processed + - projects_skipped: Projects without valid path or git dir + - projects_failed: Projects with ingestion errors + - total_commits_added: Sum of new commits across all projects + - per_project: List of results (only includes projects with git repos, + not skipped projects - use projects_skipped count for those) + """ + # Get unique project paths from events + rows = storage.execute_query( + """ + SELECT DISTINCT project_path + FROM events + WHERE project_path IS NOT NULL + """ + ) + + projects_found = len(rows) + projects_with_git = 0 + projects_ingested = 0 + projects_skipped = 0 + projects_failed = 0 + total_commits_added = 0 + per_project_results = [] + + for row in rows: + encoded_path = row["project_path"] + decoded_path = decode_project_path(encoded_path) + + if decoded_path is None: + projects_skipped += 1 + logger.debug(f"Could not decode or find path: {encoded_path}") + continue + + # Check if it's a git repo (directory) or worktree (.git file pointing to main repo) + git_path = decoded_path / ".git" + if not git_path.exists(): + projects_skipped += 1 + continue + + projects_with_git += 1 + + # Run git ingestion + result = ingest_git_history( + storage=storage, + repo_path=decoded_path, + days=days, + project_path=encoded_path, + ) + + if "error" in result: + projects_failed += 1 + logger.warning(f"Git ingestion failed for {decoded_path}: {result['error']}") + else: + projects_ingested += 1 + total_commits_added += result.get("commits_added", 0) + + per_project_results.append( + { + "project": str(decoded_path), + "commits_added": result.get("commits_added", 0), + "error": result.get("error"), + } + ) + + return { + "days": days, + "projects_found": projects_found, + "projects_with_git": projects_with_git, + "projects_ingested": projects_ingested, + "projects_skipped": projects_skipped, + "projects_failed": projects_failed, + "total_commits_added": total_commits_added, + "per_project": per_project_results, + } + + def correlate_git_with_sessions( storage: SQLiteStorage, days: int = 7, diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index 9a93e2f..6a4a742 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -592,6 +592,26 @@ def correlate_git_with_sessions(days: int = 7) -> dict: return {"status": "ok", **result} +@mcp.tool() +def ingest_git_history_all_projects(days: int = 7) -> dict: + """Ingest git commit history from all known projects. + + Scans unique project paths from the events table, decodes them to filesystem + paths, and runs git ingestion on each that has a .git directory. + + This is more comprehensive than ingest_git_history() which only processes + the current directory. + + Args: + days: Number of days of history to ingest (default: 7) + + Returns: + Aggregate stats across all projects including total commits added + """ + result = ingest.ingest_git_history_all_projects(storage, days=days) + return {"status": "ok", **result} + + @mcp.tool() def get_session_signals(days: int = 7, min_count: int = 1) -> dict: """Get raw session signals for LLM interpretation. diff --git a/src/session_analytics/storage.py b/src/session_analytics/storage.py index cc36a0a..384bfed 100644 --- a/src/session_analytics/storage.py +++ b/src/session_analytics/storage.py @@ -174,7 +174,7 @@ class BusEvent: DEFAULT_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" # Schema version for migrations -SCHEMA_VERSION = 9 +SCHEMA_VERSION = 12 # Migration functions: dict of version -> (migration_name, migration_func) # Each migration upgrades FROM version-1 TO version @@ -502,6 +502,92 @@ def migrate_v9(conn): conn.execute("ALTER TABLE events ADD COLUMN result_size_bytes INTEGER") +@migration(10, "backfill_compaction_and_result_size") +def migrate_v10(conn): + """Backfill compaction detection and result_size_bytes for existing data. + + Issue #69 follow-up: Migration 9 added the column and detection logic for + new ingestion, but existing data wasn't backfilled. This migration: + + 1. Updates entry_type to 'compaction' for existing user/summary entries + containing "continued from a previous conversation" marker + 2. Backfills result_size_bytes for all entries with message_text + + Note: Compaction markers appear in 'user' entries (system-injected continuation + messages), not 'summary' entries as originally assumed. + + This is idempotent - safe to run multiple times. + """ + # Backfill compaction detection for existing user/summary entries + # Note: The marker appears in 'user' entries, but we check both for safety + cursor = conn.execute( + """ + UPDATE events + SET entry_type = 'compaction' + WHERE entry_type IN ('user', 'summary') + AND message_text LIKE '%continued from a previous conversation%' + """ + ) + compaction_count = cursor.rowcount + logger.info(f"Marked {compaction_count} existing entries as compaction events") + + # Backfill result_size_bytes for all entries with message_text + cursor = conn.execute( + """ + UPDATE events + SET result_size_bytes = LENGTH(message_text) + WHERE message_text IS NOT NULL + AND result_size_bytes IS NULL + """ + ) + size_count = cursor.rowcount + logger.info(f"Backfilled result_size_bytes for {size_count} entries") + + +@migration(11, "fix_compaction_detection_user_entries") +def migrate_v11(conn): + """Fix compaction detection to include user entries. + + Issue #69 bug fix: Migration 10 only looked at 'summary' entries, but + compaction markers appear in 'user' entries (system-injected continuation + messages). This migration corrects that. + + This is idempotent - safe to run multiple times. + """ + cursor = conn.execute( + """ + UPDATE events + SET entry_type = 'compaction' + WHERE entry_type = 'user' + AND message_text LIKE '%continued from a previous conversation%' + """ + ) + compaction_count = cursor.rowcount + logger.info(f"Fixed {compaction_count} user entries to compaction type") + + +@migration(12, "fix_warmup_not_errors") +def migrate_v12(conn): + """Fix warmup events incorrectly marked as errors. + + Issue #75: Warmup events (Task tool invocations with max_turns: 1) were + marked as is_error=1 because they exit early. These are not real errors - + they're intentional early terminations for model pre-warming. + + This migration clears is_error for all warmup events. + """ + cursor = conn.execute( + """ + UPDATE events + SET is_error = 0 + WHERE is_error = 1 + AND message_text = 'Warmup' + """ + ) + warmup_count = cursor.rowcount + logger.info(f"Fixed {warmup_count} warmup events from is_error=1 to is_error=0") + + class SQLiteStorage: """SQLite-backed storage for session analytics.""" diff --git a/tests/test_ingest.py b/tests/test_ingest.py index b4a9eb2..68a9953 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -8,12 +8,14 @@ from session_analytics.ingest import ( calculate_result_size, + decode_project_path, detect_compaction, extract_command_name, extract_text_from_content, extract_tool_result_content, find_log_files, ingest_file, + ingest_git_history_all_projects, parse_entry, parse_tool_use, ) @@ -1475,3 +1477,190 @@ def test_regular_summary_not_detected(self): """Regular summaries without the marker return False.""" text = "This is a summary of the recent conversation about implementing a feature." assert detect_compaction(text) is False + + +class TestDecodeProjectPath: + """Tests for decode_project_path() function.""" + + def test_none_returns_none(self): + """None input returns None.""" + assert decode_project_path(None) is None + + def test_empty_string_returns_none(self): + """Empty string returns None.""" + assert decode_project_path("") is None + + def test_simple_path(self, tmp_path): + """Simple path without hyphens in names decodes correctly.""" + # Create /tmp/xxx/foo/bar + (tmp_path / "foo" / "bar").mkdir(parents=True) + # Encode: tmp_path looks like /var/folders/xxx or /tmp/pytest-xxx + # We'll test by encoding a known path and decoding it + encoded = str(tmp_path / "foo" / "bar").replace("/", "-") + result = decode_project_path(encoded) + assert result == tmp_path / "foo" / "bar" + + def test_path_with_hyphens(self, tmp_path): + """Path with hyphens in directory names decodes correctly.""" + # Create /tmp/xxx/my-project/src + (tmp_path / "my-project" / "src").mkdir(parents=True) + encoded = str(tmp_path / "my-project" / "src").replace("/", "-") + result = decode_project_path(encoded) + assert result == tmp_path / "my-project" / "src" + + def test_nonexistent_path_returns_none(self): + """Path that doesn't exist returns None.""" + result = decode_project_path("-Users-nonexistent-path-that-does-not-exist") + assert result is None + + def test_ambiguous_path_finds_existing(self, tmp_path): + """When multiple interpretations possible, finds the existing one.""" + # Create structure that could be /a-b/c or /a/b-c + # Only create /a-b/c + (tmp_path / "a-b" / "c").mkdir(parents=True) + encoded = str(tmp_path / "a-b" / "c").replace("/", "-") + result = decode_project_path(encoded) + assert result == tmp_path / "a-b" / "c" + + def test_real_world_pattern(self, tmp_path): + """Pattern like claude-session-analytics decodes when directory exists.""" + # Simulate: /projects/claude-session-analytics + (tmp_path / "projects" / "claude-session-analytics").mkdir(parents=True) + encoded = str(tmp_path / "projects" / "claude-session-analytics").replace("/", "-") + result = decode_project_path(encoded) + assert result == tmp_path / "projects" / "claude-session-analytics" + + def test_file_not_directory_returns_none(self, tmp_path): + """Path pointing to a file (not directory) returns None.""" + # Create a file + file_path = tmp_path / "myfile.txt" + file_path.write_text("test") + encoded = str(file_path).replace("/", "-") + result = decode_project_path(encoded) + assert result is None + + +class TestIngestGitHistoryAllProjects: + """Tests for ingest_git_history_all_projects() function.""" + + def test_empty_project_list(self, storage): + """Returns zeros when no projects in database.""" + result = ingest_git_history_all_projects(storage, days=7) + + assert result["projects_found"] == 0 + assert result["projects_with_git"] == 0 + assert result["projects_ingested"] == 0 + assert result["projects_skipped"] == 0 + assert result["total_commits_added"] == 0 + assert result["per_project"] == [] + + def test_projects_without_git(self, storage, tmp_path): + """Projects without .git are skipped.""" + from datetime import datetime + + from session_analytics.storage import Event + + # Create a directory without .git + project_dir = tmp_path / "no-git-project" + project_dir.mkdir() + encoded_path = str(project_dir).replace("/", "-") + + # Add event with this project path + storage.add_event( + Event( + id=None, + uuid="test-uuid-1", + timestamp=datetime.now(), + session_id="test-session", + project_path=encoded_path, + entry_type="user", + ) + ) + + result = ingest_git_history_all_projects(storage, days=7) + + assert result["projects_found"] == 1 + assert result["projects_with_git"] == 0 + assert result["projects_skipped"] == 1 + assert result["per_project"] == [] + + def test_project_with_git(self, storage, tmp_path): + """Projects with .git are processed.""" + from datetime import datetime + from unittest.mock import patch + + from session_analytics.storage import Event + + # Create a directory with .git + project_dir = tmp_path / "my-project" + project_dir.mkdir() + (project_dir / ".git").mkdir() # Create .git directory + encoded_path = str(project_dir).replace("/", "-") + + # Add event with this project path + storage.add_event( + Event( + id=None, + uuid="test-uuid-2", + timestamp=datetime.now(), + session_id="test-session", + project_path=encoded_path, + entry_type="user", + ) + ) + + # Mock git log to return empty (no commits) + with patch("subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = "" + + result = ingest_git_history_all_projects(storage, days=7) + + assert result["projects_found"] == 1 + assert result["projects_with_git"] == 1 + assert result["projects_ingested"] == 1 + assert result["projects_skipped"] == 0 + assert len(result["per_project"]) == 1 + assert result["per_project"][0]["project"] == str(project_dir) + + def test_decode_failure_skipped(self, storage): + """Projects that can't be decoded are skipped.""" + from datetime import datetime + + from session_analytics.storage import Event + + # Add event with invalid project path that can't be decoded + storage.add_event( + Event( + id=None, + uuid="test-uuid-3", + timestamp=datetime.now(), + session_id="test-session", + project_path="-nonexistent-path-that-does-not-exist", + entry_type="user", + ) + ) + + result = ingest_git_history_all_projects(storage, days=7) + + assert result["projects_found"] == 1 + assert result["projects_with_git"] == 0 + assert result["projects_skipped"] == 1 + assert result["per_project"] == [] + + def test_result_structure(self, storage): + """Result dict has all expected keys.""" + result = ingest_git_history_all_projects(storage, days=7) + + expected_keys = { + "days", + "projects_found", + "projects_with_git", + "projects_ingested", + "projects_skipped", + "projects_failed", + "total_commits_added", + "per_project", + } + assert set(result.keys()) == expected_keys + assert result["days"] == 7 diff --git a/tests/test_smoke_real_data.py b/tests/test_smoke_real_data.py new file mode 100644 index 0000000..eabe7ee --- /dev/null +++ b/tests/test_smoke_real_data.py @@ -0,0 +1,269 @@ +"""Smoke tests that validate assumptions against real database data. + +These tests are skipped by default (no real database) and only run when +SESSION_ANALYTICS_SMOKE_TEST=1 is set. They catch issues like: +- Compaction detection not finding entries +- result_size_bytes not being populated +- Entry type distribution anomalies +- Token count reasonableness + +Run with: SESSION_ANALYTICS_SMOKE_TEST=1 pytest tests/test_smoke_real_data.py -v +""" + +import os +from pathlib import Path + +import pytest + +# Skip all tests in this file unless smoke test env var is set +pytestmark = pytest.mark.skipif( + os.environ.get("SESSION_ANALYTICS_SMOKE_TEST") != "1", + reason="Smoke tests require SESSION_ANALYTICS_SMOKE_TEST=1 and real database", +) + + +@pytest.fixture +def real_storage(): + """Get storage instance pointing to real database.""" + from session_analytics.storage import SQLiteStorage + + db_path = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" + if not db_path.exists(): + pytest.skip("Real database not found") + return SQLiteStorage(db_path) + + +class TestCompactionDetection: + """Validate compaction detection is working.""" + + def test_compaction_entries_exist(self, real_storage): + """Compaction entries should exist if sessions have context resets.""" + rows = real_storage.execute_query( + "SELECT COUNT(*) as count FROM events WHERE entry_type = 'compaction'" + ) + compaction_count = rows[0]["count"] + + # Also check for undetected compactions (marker in user entries) + rows = real_storage.execute_query( + """ + SELECT COUNT(*) as count FROM events + WHERE entry_type = 'user' + AND message_text LIKE '%continued from a previous conversation%' + """ + ) + undetected = rows[0]["count"] + + # Fail if there are undetected compactions + assert undetected == 0, ( + f"Found {undetected} user entries with compaction marker " + f"that should have entry_type='compaction'. Run migration 11." + ) + + # Info: how many compactions were detected + print(f"\nCompaction entries detected: {compaction_count}") + + def test_compaction_marker_not_in_tool_results(self, real_storage): + """Tool results shouldn't be mis-detected as compactions.""" + # The marker text may appear in tool results (e.g., GitHub issue body) + # These should NOT be marked as compaction + rows = real_storage.execute_query( + """ + SELECT COUNT(*) as count FROM events + WHERE entry_type = 'compaction' + AND tool_name IS NOT NULL + """ + ) + tool_compactions = rows[0]["count"] + assert tool_compactions == 0, ( + f"Found {tool_compactions} compaction entries with tool_name set. " + "Compactions should only be user messages, not tool results." + ) + + +class TestResultSizeBytes: + """Validate result_size_bytes is populated.""" + + def test_result_size_populated_for_message_text(self, real_storage): + """Entries with message_text should have result_size_bytes.""" + rows = real_storage.execute_query( + """ + SELECT + COUNT(*) as total, + SUM(CASE WHEN result_size_bytes IS NOT NULL THEN 1 ELSE 0 END) as populated + FROM events + WHERE message_text IS NOT NULL + """ + ) + total = rows[0]["total"] + populated = rows[0]["populated"] + + # Allow some tolerance for entries added before migration + population_rate = populated / total if total > 0 else 0 + assert population_rate > 0.95, ( + f"Only {population_rate:.1%} of entries with message_text have result_size_bytes. " + f"Expected >95%. Run migration 10 to backfill." + ) + + def test_result_size_reasonable_values(self, real_storage): + """result_size_bytes should have reasonable values.""" + rows = real_storage.execute_query( + """ + SELECT + MIN(result_size_bytes) as min_size, + MAX(result_size_bytes) as max_size, + AVG(result_size_bytes) as avg_size + FROM events + WHERE result_size_bytes IS NOT NULL + """ + ) + min_size = rows[0]["min_size"] + max_size = rows[0]["max_size"] + avg_size = rows[0]["avg_size"] + + assert min_size >= 0, "result_size_bytes should not be negative" + assert max_size < 100_000_000, f"Suspiciously large result: {max_size} bytes" + print(f"\nResult sizes: min={min_size}, max={max_size:,}, avg={avg_size:,.0f}") + + +class TestEntryTypeDistribution: + """Validate entry type distribution looks reasonable.""" + + def test_entry_types_present(self, real_storage): + """Core entry types should be present.""" + rows = real_storage.execute_query( + """ + SELECT entry_type, COUNT(*) as count + FROM events + GROUP BY entry_type + ORDER BY count DESC + """ + ) + entry_types = {r["entry_type"]: r["count"] for r in rows} + + # These should always exist in real usage + assert "assistant" in entry_types, "No assistant entries found" + assert "tool_use" in entry_types, "No tool_use entries found" + assert "tool_result" in entry_types, "No tool_result entries found" + assert "user" in entry_types, "No user entries found" + + # Print distribution + print("\nEntry type distribution:") + for entry_type, count in entry_types.items(): + print(f" {entry_type}: {count:,}") + + def test_tool_use_tool_result_balance(self, real_storage): + """tool_use and tool_result counts should be similar.""" + rows = real_storage.execute_query( + """ + SELECT entry_type, COUNT(*) as count + FROM events + WHERE entry_type IN ('tool_use', 'tool_result') + GROUP BY entry_type + """ + ) + counts = {r["entry_type"]: r["count"] for r in rows} + + tool_use = counts.get("tool_use", 0) + tool_result = counts.get("tool_result", 0) + + if tool_use > 0: + ratio = tool_result / tool_use + # Allow some tolerance - tool_result might be slightly less + # if some tools don't return results + assert 0.8 < ratio < 1.2, ( + f"tool_use ({tool_use}) and tool_result ({tool_result}) " + f"counts are imbalanced (ratio: {ratio:.2f})" + ) + + +class TestTokenData: + """Validate token data looks reasonable.""" + + def test_tokens_on_assistant_entries(self, real_storage): + """Assistant entries should have token data.""" + rows = real_storage.execute_query( + """ + SELECT + COUNT(*) as total, + SUM(CASE WHEN input_tokens > 0 OR output_tokens > 0 THEN 1 ELSE 0 END) as with_tokens + FROM events + WHERE entry_type = 'assistant' + AND model IS NOT NULL + AND model != 'unknown' + """ + ) + total = rows[0]["total"] + with_tokens = rows[0]["with_tokens"] + + if total > 0: + token_rate = with_tokens / total + assert token_rate > 0.9, ( + f"Only {token_rate:.1%} of assistant entries have tokens. Expected >90%." + ) + + def test_token_values_reasonable(self, real_storage): + """Token values should be in reasonable ranges.""" + rows = real_storage.execute_query( + """ + SELECT + MAX(input_tokens) as max_input, + MAX(output_tokens) as max_output, + AVG(input_tokens) as avg_input, + AVG(output_tokens) as avg_output + FROM events + WHERE entry_type = 'assistant' + AND (input_tokens > 0 OR output_tokens > 0) + """ + ) + max_input = rows[0]["max_input"] or 0 + max_output = rows[0]["max_output"] or 0 + + # Claude's context window is ~200K, individual responses much smaller + assert max_input < 500_000, f"Suspiciously high input_tokens: {max_input}" + assert max_output < 100_000, f"Suspiciously high output_tokens: {max_output}" + + print(f"\nToken stats: max_input={max_input:,}, max_output={max_output:,}") + + +class TestToolIdJoins: + """Validate tool_id enables proper joins.""" + + def test_tool_result_joins_to_tool_use(self, real_storage): + """tool_result entries should join to tool_use via tool_id.""" + rows = real_storage.execute_query( + """ + SELECT COUNT(*) as count + FROM events e1 + LEFT JOIN events e2 ON e1.tool_id = e2.tool_id AND e2.entry_type = 'tool_use' + WHERE e1.entry_type = 'tool_result' + AND e1.tool_id IS NOT NULL + AND e2.id IS NULL + """ + ) + orphan_results = rows[0]["count"] + assert orphan_results == 0, ( + f"Found {orphan_results} tool_result entries that don't join to tool_use. " + "This breaks error attribution." + ) + + +class TestErrorClassification: + """Validate error data quality.""" + + def test_warmup_not_counted_as_errors(self, real_storage): + """Warmup events should not be marked as errors.""" + rows = real_storage.execute_query( + """ + SELECT COUNT(*) as count + FROM events + WHERE is_error = 1 + AND message_text = 'Warmup' + """ + ) + warmup_errors = rows[0]["count"] + + # After migration 12, warmup events should not be marked as errors + assert warmup_errors == 0, ( + f"Found {warmup_errors} warmup events marked as errors. " + "Migration 12 should have fixed this. Re-run migrations or re-ingest." + )