diff --git a/CLAUDE.md b/CLAUDE.md index cae57a6..8131e12 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,6 +22,11 @@ Queryable analytics for Claude Code session logs, exposed as an MCP server and C cp ~/.claude/contrib/analytics/data.db ~/.claude/contrib/analytics/data.db.backup-$(date +%Y%m%d-%H%M%S) ``` +### When adding new columns: +1. **Always backup first** (see above) +2. **Backfill existing data** when possible - new columns should be populated for historical records, not just future ingestion +3. For backfill: either re-ingest from JSONL (`ingest_logs(force=True)` after clearing) or write UPDATE queries in the migration + --- ## Design Philosophy diff --git a/docs/SCHEMA.md b/docs/SCHEMA.md index b9ff784..a155565 100644 --- a/docs/SCHEMA.md +++ b/docs/SCHEMA.md @@ -80,6 +80,9 @@ CREATE TABLE events ( is_sidechain INTEGER DEFAULT 0, version TEXT, -- Claude Code version + -- Context efficiency (Issue #69) + result_size_bytes INTEGER, -- Size of message_text in bytes for context analysis + UNIQUE(session_id, uuid) -- UUID unique within each session ) ``` @@ -89,6 +92,8 @@ CREATE TABLE events ( - Token columns only populated on `entry_type='assistant'` to avoid double-counting - `message_text` enables FTS via `events_fts` virtual table for all entry types - `tool_input_json` preserves full parameters for drill-down queries +- `entry_type='compaction'` marks context resets (detected from summary text containing "continued from a previous conversation") +- `result_size_bytes` enables context burn rate analysis ### sessions @@ -253,6 +258,7 @@ Sync triggers maintain index consistency: | 6 | add_event_bus_integration | bus_events table | | 7 | add_tool_id_index | Performance index for self-joins | | 8 | add_unified_message_text | Unified message_text column, rebuilt FTS on all entry types (Issue #68) | +| 9 | add_result_size_bytes | result_size_bytes column for context efficiency tracking (Issue #69) | --- diff --git a/src/session_analytics/cli.py b/src/session_analytics/cli.py index 9b9f432..91e8d4d 100644 --- a/src/session_analytics/cli.py +++ b/src/session_analytics/cli.py @@ -24,7 +24,11 @@ classify_sessions, detect_parallel_sessions, find_related_sessions, + get_compaction_events, get_handoff_context, + get_large_tool_results, + get_pre_compaction_events, + get_session_efficiency, get_user_journey, query_agent_activity, query_bus_events, @@ -610,6 +614,98 @@ def format_metric(name: str, metric: dict) -> str: return lines +# Issue #69: Compaction and efficiency formatters + + +@_register_formatter(lambda d: "compaction_count" in d and "compactions" in d) +def _format_compactions(data: dict) -> list[str]: + # Count unique sessions + unique_sessions = len({c["session_id"] for c in data.get("compactions", [])}) + lines = [ + f"Compaction events (context resets) - last {data.get('days', 7)} days", + "", + f"Total compactions: {data['compaction_count']}", + f"Sessions affected: {unique_sessions}", + "", + ] + if data.get("compactions"): + lines.append("Recent compactions:") + for c in data["compactions"][:10]: + lines.append(f" {c['timestamp']} - session {c['session_id'][:8]}...") + return lines + + +@_register_formatter(lambda d: "compaction_timestamp" in d and "events" in d and "event_count" in d) +def _format_pre_compaction(data: dict) -> list[str]: + lines = [ + f"Events before compaction at {data['compaction_timestamp']}", + f"Session: {data['session_id']}", + "", + f"Events found: {data['event_count']}", + "", + ] + if data.get("events"): + lines.append("Events (most recent first):") + for e in data["events"]: + tool = e.get("tool") or e.get("type", "unknown") + size_info = "" + if e.get("size_bytes"): + size_kb = e["size_bytes"] / 1024 + size_info = f" ({size_kb:.1f}KB)" + identifier = e.get("file") or e.get("command") or "" + if identifier: + identifier = f" - {identifier[:40]}" + error_mark = " [ERR]" if e.get("error") else "" + lines.append(f" {e['timestamp']} {tool}{size_info}{identifier}{error_mark}") + return lines + + +@_register_formatter(lambda d: "large_results" in d and "result_count" in d) +def _format_large_results(data: dict) -> list[str]: + # Calculate total from tool_breakdown + total_mb = sum(t.get("total_mb", 0) for t in data.get("tool_breakdown", [])) + lines = [ + f"Large tool results (>= {data.get('min_size_kb', 10)}KB) - last {data.get('days', 7)} days", + "", + f"Total large results: {data['result_count']}", + f"Total size: {total_mb:.2f}MB", + "", + ] + if data.get("large_results"): + lines.append("Top results by size:") + for r in data["large_results"][:10]: + identifier = r.get("file") or r.get("command") or "N/A" + lines.append(f" {r['tool']}: {r['size_kb']:.1f}KB - {identifier[:50]}") + return lines + + +@_register_formatter( + lambda d: "sessions" in d + and "session_count" in d + and any("efficiency_signals" in s for s in d.get("sessions", [])) +) +def _format_efficiency(data: dict) -> list[str]: + lines = [ + f"Session efficiency - last {data.get('days', 7)} days", + "", + f"Sessions analyzed: {data.get('session_count', 0)}", + "", + "Sessions by context usage:", + ] + for s in data.get("sessions", [])[:10]: + signals = s.get("efficiency_signals", {}) + total_mb = signals.get("total_result_mb", 0) + compactions = signals.get("compaction_count", 0) + burn_rate = signals.get("burn_rate_tokens_per_event", 0) + read_edit = signals.get("read_to_edit_ratio", 0) + multi_read = signals.get("files_read_multiple_times", 0) + lines.append( + f" {s['session_id'][:8]}...: {total_mb:.2f}MB, {compactions} compactions, " + f"{burn_rate:.0f} tok/ev, R/E:{read_edit:.1f}, multi-read:{multi_read}" + ) + return lines + + def format_output(data: dict, json_output: bool = False) -> str: """Format output as JSON or human-readable.""" if json_output: @@ -1024,6 +1120,55 @@ def cmd_session_commits(args): print(format_output(result, args.json)) +# Issue #69: Compaction and efficiency commands + + +def cmd_compactions(args): + """Show compaction events (context resets).""" + storage = SQLiteStorage() + result = get_compaction_events( + storage, + days=args.days, + session_id=getattr(args, "session_id", None), + ) + print(format_output(result, args.json)) + + +def cmd_pre_compaction(args): + """Show events before a compaction event.""" + storage = SQLiteStorage() + result = get_pre_compaction_events( + storage, + session_id=args.session_id, + compaction_timestamp=args.timestamp, + limit=args.limit, + ) + print(format_output(result, args.json)) + + +def cmd_large_results(args): + """Show large tool results that consume context space.""" + storage = SQLiteStorage() + result = get_large_tool_results( + storage, + days=args.days, + min_size_kb=args.min_size, + limit=args.limit, + ) + print(format_output(result, args.json)) + + +def cmd_efficiency(args): + """Show session context efficiency metrics.""" + storage = SQLiteStorage() + result = get_session_efficiency( + storage, + days=args.days, + project=getattr(args, "project", None), + ) + print(format_output(result, args.json)) + + def _benchmark_tool(tool_name: str, tool_func: callable, iterations: int = 3) -> dict: """Benchmark a single MCP tool with multiple iterations. @@ -1097,9 +1242,18 @@ def cmd_benchmark(args): from session_analytics.queries import ( detect_parallel_sessions as queries_detect_parallel_sessions, ) + from session_analytics.queries import ( + get_compaction_events as queries_get_compaction_events, + ) from session_analytics.queries import ( get_handoff_context as queries_get_handoff_context, ) + from session_analytics.queries import ( + get_large_tool_results as queries_get_large_tool_results, + ) + from session_analytics.queries import ( + get_session_efficiency as queries_get_session_efficiency, + ) from session_analytics.queries import ( get_user_journey as queries_get_user_journey, ) @@ -1183,6 +1337,12 @@ def cmd_benchmark(args): "get_mcp_usage": lambda: queries_query_mcp_usage(storage, days=7), "get_agent_activity": lambda: queries_query_agent_activity(storage, days=7), "get_bus_events": lambda: queries_query_bus_events(storage, days=7, limit=10), + # Issue #69: Compaction and efficiency tools + "get_compaction_events": lambda: queries_get_compaction_events(storage, days=7), + "get_large_tool_results": lambda: queries_get_large_tool_results( + storage, days=7, min_size_kb=10, limit=10 + ), + "get_session_efficiency": lambda: queries_get_session_efficiency(storage, days=7), } # Skipped tools (require specific data or modify DB): @@ -1474,6 +1634,36 @@ def main(): sub.add_argument("--limit", type=int, default=100, help="Max events to return (default: 100)") sub.set_defaults(func=cmd_bus_events) + # Issue #69: Compaction and efficiency commands + + # compactions + sub = subparsers.add_parser("compactions", help="Show compaction events (context resets)") + sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") + sub.add_argument("--session-id", help="Filter to specific session ID") + sub.set_defaults(func=cmd_compactions) + + # pre-compaction + sub = subparsers.add_parser("pre-compaction", help="Show events before a compaction event") + sub.add_argument("session_id", help="Session ID to analyze") + sub.add_argument("timestamp", help="ISO timestamp of the compaction event") + sub.add_argument("--limit", type=int, default=50, help="Max events to return (default: 50)") + sub.set_defaults(func=cmd_pre_compaction) + + # large-results + sub = subparsers.add_parser( + "large-results", help="Show large tool results consuming context space" + ) + sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") + sub.add_argument("--min-size", type=int, default=10, help="Minimum size in KB (default: 10)") + sub.add_argument("--limit", type=int, default=50, help="Max results to return (default: 50)") + sub.set_defaults(func=cmd_large_results) + + # efficiency + sub = subparsers.add_parser("efficiency", help="Show session context efficiency metrics") + sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") + sub.add_argument("--project", help="Project path filter") + sub.set_defaults(func=cmd_efficiency) + # benchmark (Issue #63) sub = subparsers.add_parser("benchmark", help="Benchmark all MCP tool response times") sub.add_argument( diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md index 56e4a53..84a3710 100644 --- a/src/session_analytics/guide.md +++ b/src/session_analytics/guide.md @@ -115,6 +115,22 @@ Each session includes `classification_factors` explaining WHY it was categorized |------|---------| | `get_agent_activity(days?, project?)` | Task subagent activity vs main session (RFC #41) | +### Context Efficiency Analysis + +| Tool | Purpose | +|------|---------| +| `get_compaction_events(days?, session_id?)` | List compaction events (context resets) | +| `get_pre_compaction_events(session_id, compaction_timestamp, limit?)` | Events before a compaction for analysis | +| `get_large_tool_results(days?, min_size_kb?, limit?)` | Find tool results consuming context space | +| `get_session_efficiency(days?, project?)` | Session efficiency metrics and burn rate | + +**Context efficiency** helps identify why sessions hit context limits: +- **Compactions**: Context resets when Claude summarizes conversation +- **Large results**: Tool outputs consuming significant context space +- **Burn rate**: How fast sessions consume their context budget +- **Read/Edit ratio**: High ratio suggests inefficient exploration (should use Task/Explore) +- **Files read multiple times**: Redundant reads indicate opportunity to cache context + ### Event-Bus Integration | Tool | Purpose | @@ -200,6 +216,15 @@ analyze_failures() → "These commands tend to fail" analyze_trends() → "Usage is increasing/decreasing" ``` +### Workflow: Context Efficiency + +``` +get_compaction_events() → "When did context resets happen?" +get_session_efficiency() → "Which sessions burn context fastest?" +get_large_tool_results() → "What operations consume the most space?" +get_pre_compaction_events() → "What led up to a specific reset?" +``` + ## Reference ### Session Categories diff --git a/src/session_analytics/ingest.py b/src/session_analytics/ingest.py index 7d502ec..8c20bd0 100644 --- a/src/session_analytics/ingest.py +++ b/src/session_analytics/ingest.py @@ -84,6 +84,28 @@ def extract_tool_result_content(tool_result: dict) -> str | None: return None +def calculate_result_size(text: str | None) -> int | None: + """Calculate the byte size of text content. + + Issue #69: Tracks context window consumption for efficiency analysis. + Uses UTF-8 encoding to get actual byte size (not character count). + """ + if text is None: + return None + return len(text.encode("utf-8")) + + +def detect_compaction(text: str | None) -> bool: + """Detect if summary content indicates a compaction event. + + Issue #69: Compaction occurs when Claude Code truncates conversation history. + The summary message contains the marker phrase indicating context was compressed. + """ + if not text: + return False + return "continued from a previous conversation" in text.lower() + + def find_log_files( logs_dir: Path = DEFAULT_LOGS_DIR, days: int = 7, @@ -298,6 +320,7 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: git_branch=git_branch, cwd=cwd, message_text=assistant_text, # Issue #68: unified message text + result_size_bytes=calculate_result_size(assistant_text), # Issue #69 # RFC #41: Agent tracking fields parent_uuid=None, # Assistant events have no parent agent_id=agent_id, @@ -392,6 +415,7 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: git_branch=git_branch, cwd=cwd, message_text=tool_result_text, # Issue #68: full tool result + result_size_bytes=calculate_result_size(tool_result_text), # Issue #69 # RFC #41: Agent tracking fields agent_id=agent_id, is_sidechain=is_sidechain, @@ -411,6 +435,7 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: skill_name=command_name, # Reuse skill_name for command tracking user_message_text=user_message_text, message_text=message_text, # Issue #68: unified message text + result_size_bytes=calculate_result_size(message_text), # Issue #69 git_branch=git_branch, cwd=cwd, # RFC #41: Agent tracking fields @@ -432,6 +457,7 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: skill_name=command_name, # Reuse skill_name for command tracking user_message_text=user_message_text, message_text=message_text, # Issue #68: unified message text + result_size_bytes=calculate_result_size(message_text), # Issue #69 git_branch=git_branch, cwd=cwd, # RFC #41: Agent tracking fields @@ -447,6 +473,9 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: summary_content = message.get("content", "") if message else raw.get("summary", "") summary_text = extract_text_from_content(summary_content) + # Issue #69: Detect compaction events + is_compaction = detect_compaction(summary_text) + events.append( Event( id=None, @@ -454,8 +483,9 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: timestamp=timestamp if timestamp else datetime.now(), session_id=session_id if session_id else "unknown", project_path=project_path, - entry_type="summary", + entry_type="compaction" if is_compaction else "summary", # Issue #69 message_text=summary_text, # Issue #68: unified message text + result_size_bytes=calculate_result_size(summary_text), # Issue #69 # RFC #41: Agent tracking fields agent_id=agent_id, is_sidechain=is_sidechain, diff --git a/src/session_analytics/queries.py b/src/session_analytics/queries.py index 27c6ad6..cef61cd 100644 --- a/src/session_analytics/queries.py +++ b/src/session_analytics/queries.py @@ -2028,3 +2028,328 @@ def query_error_details( "tool_totals": tool_totals, "total_errors": sum(tool_totals.values()), } + + +# Issue #69: Context efficiency queries + + +def get_compaction_events( + storage: SQLiteStorage, + days: int = 7, + session_id: str | None = None, +) -> dict: + """List compaction events where conversation history was truncated. + + Compaction events are summaries with "continued from a previous conversation" + marker, indicating Claude Code compacted the context window. + + Args: + storage: Storage instance + days: Number of days to analyze (default: 7) + session_id: Optional filter for specific session + + Returns: + Dict with compaction events and their timestamps + """ + cutoff = get_cutoff(days=days) + + where_parts = ["timestamp >= ?", "entry_type = 'compaction'"] + params: list = [cutoff] + + if session_id: + where_parts.append("session_id = ?") + params.append(session_id) + + where_clause = " AND ".join(where_parts) + + rows = storage.execute_query( + f""" + SELECT + timestamp, + session_id, + project_path, + result_size_bytes, + message_text + FROM events + WHERE {where_clause} + ORDER BY timestamp DESC + """, + tuple(params), + ) + + compactions = [ + { + "timestamp": _format_timestamp(row["timestamp"]), + "session_id": row["session_id"], + "project": row["project_path"], + "summary_size_bytes": row["result_size_bytes"], + "summary_preview": (row["message_text"] or "")[:200], + } + for row in rows + ] + + return { + "days": days, + "session_id": session_id, + "compaction_count": len(compactions), + "compactions": compactions, + } + + +def get_pre_compaction_events( + storage: SQLiteStorage, + session_id: str, + compaction_timestamp: str, + limit: int = 50, +) -> dict: + """Get events before a compaction to understand what was summarized. + + Shows the N events immediately before a compaction event occurred, + revealing what context was compressed. Events are ordered by timestamp + descending (most recent first) so the events closest to the compaction + appear first. + + Args: + storage: Storage instance + session_id: Session containing the compaction + compaction_timestamp: ISO timestamp of compaction event + limit: Max events to return before compaction (default: 50) + + Returns: + Dict with pre-compaction events, ordered by timestamp descending + """ + compact_time = datetime.fromisoformat(compaction_timestamp) + + rows = storage.execute_query( + """ + SELECT + timestamp, + entry_type, + tool_name, + command, + file_path, + is_error, + result_size_bytes + FROM events + WHERE session_id = ? + AND timestamp < ? + ORDER BY timestamp DESC + LIMIT ? + """, + (session_id, compact_time, limit), + ) + + events = [ + { + "timestamp": _format_timestamp(row["timestamp"]), + "type": row["entry_type"], + "tool": row["tool_name"], + "command": row["command"], + "file": row["file_path"], + "error": bool(row["is_error"]), + "size_bytes": row["result_size_bytes"], + } + for row in rows + ] + + return { + "session_id": session_id, + "compaction_timestamp": compaction_timestamp, + "event_count": len(events), + "events": events, + } + + +def get_large_tool_results( + storage: SQLiteStorage, + days: int = 7, + min_size_kb: int = 10, + limit: int = 50, +) -> dict: + """Find tool calls with large outputs (bloat detection). + + Identifies tool results consuming significant context space, + indicating opportunities for pagination or output filtering. + + Args: + storage: Storage instance + days: Number of days to analyze (default: 7) + min_size_kb: Minimum size in KB to report (default: 10) + limit: Max results to return (default: 50) + + Returns: + Dict with large tool results by tool and size + """ + cutoff = get_cutoff(days=days) + min_size_bytes = min_size_kb * 1024 + + rows = storage.execute_query( + """ + SELECT + e1.timestamp, + e1.session_id, + e1.project_path, + e2.tool_name, + e2.command, + e2.file_path, + e1.result_size_bytes + FROM events e1 + JOIN events e2 ON e1.tool_id = e2.tool_id AND e2.entry_type = 'tool_use' + WHERE e1.timestamp >= ? + AND e1.entry_type = 'tool_result' + AND e1.result_size_bytes >= ? + ORDER BY e1.result_size_bytes DESC + LIMIT ? + """, + (cutoff, min_size_bytes, limit), + ) + + results = [ + { + "timestamp": _format_timestamp(row["timestamp"]), + "session_id": row["session_id"], + "project": row["project_path"], + "tool": row["tool_name"], + "command": row["command"], + "file": row["file_path"], + "size_kb": round(row["result_size_bytes"] / 1024, 1), + } + for row in rows + ] + + # Aggregate by tool + tool_totals: dict[str, int] = {} + for row in rows: + tool = row["tool_name"] + if tool: + tool_totals[tool] = tool_totals.get(tool, 0) + row["result_size_bytes"] + + tool_breakdown = [ + {"tool": tool, "total_mb": round(size / 1024 / 1024, 2)} + for tool, size in sorted(tool_totals.items(), key=lambda x: x[1], reverse=True) + ] + + return { + "days": days, + "min_size_kb": min_size_kb, + "result_count": len(results), + "tool_breakdown": tool_breakdown, + "large_results": results, + } + + +def get_session_efficiency( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, +) -> dict: + """Analyze session efficiency: burn rate, compactions, read patterns. + + Provides raw efficiency signals: + - Token burn rate (tokens per event) + - Compaction frequency (how often context fills up) + - Read-heavy patterns (large tool results consuming context) + - Assistant verbosity (output tokens per response) + - Read-to-edit ratio (high ratio suggests inefficient exploration) + - Files read multiple times (redundant reads) + + Args: + storage: Storage instance + days: Number of days to analyze (default: 7) + project: Optional project filter + + Returns: + Dict with efficiency metrics per session + """ + cutoff = get_cutoff(days=days) + where_clause, params = build_where_clause(cutoff=cutoff, project=project) + + # Get session-level efficiency metrics + rows = storage.execute_query( + f""" + SELECT + session_id, + project_path, + COUNT(*) as total_events, + SUM(CASE WHEN entry_type = 'compaction' THEN 1 ELSE 0 END) as compaction_count, + SUM(COALESCE(input_tokens, 0)) as input_tokens, + SUM(COALESCE(output_tokens, 0)) as output_tokens, + SUM(COALESCE(result_size_bytes, 0)) as total_result_bytes, + SUM(CASE WHEN entry_type = 'assistant' THEN 1 ELSE 0 END) as assistant_count, + SUM(CASE WHEN entry_type = 'tool_result' AND result_size_bytes > 10240 THEN 1 ELSE 0 END) as large_result_count, + SUM(CASE WHEN tool_name = 'Read' THEN 1 ELSE 0 END) as read_count, + SUM(CASE WHEN tool_name = 'Edit' THEN 1 ELSE 0 END) as edit_count, + MIN(timestamp) as first_seen, + MAX(timestamp) as last_seen + FROM events + WHERE {where_clause} + GROUP BY session_id + HAVING COUNT(*) >= 10 + ORDER BY compaction_count DESC, input_tokens DESC + LIMIT 50 + """, + params, + ) + + # Get files read multiple times per session + session_ids = [row["session_id"] for row in rows] + files_read_multiple: dict[str, int] = {} + if session_ids: + placeholders = ",".join("?" * len(session_ids)) + multi_read_rows = storage.execute_query( + f""" + SELECT session_id, COUNT(*) as multi_read_files + FROM ( + SELECT session_id, file_path, COUNT(*) as read_count + FROM events + WHERE session_id IN ({placeholders}) + AND tool_name = 'Read' + AND file_path IS NOT NULL + GROUP BY session_id, file_path + HAVING COUNT(*) > 1 + ) + GROUP BY session_id + """, + tuple(session_ids), + ) + files_read_multiple = {r["session_id"]: r["multi_read_files"] for r in multi_read_rows} + + sessions = [] + for row in rows: + total_events = row["total_events"] or 1 + input_tokens = row["input_tokens"] or 0 + output_tokens = row["output_tokens"] or 0 + assistant_count = row["assistant_count"] or 1 + read_count = row["read_count"] or 0 + edit_count = row["edit_count"] or 1 # Avoid division by zero + + sessions.append( + { + "session_id": row["session_id"], + "project": row["project_path"], + "first_seen": _format_timestamp(row["first_seen"]), + "last_seen": _format_timestamp(row["last_seen"]), + "efficiency_signals": { + "compaction_count": row["compaction_count"], + "burn_rate_tokens_per_event": round(input_tokens / total_events, 1), + "avg_assistant_tokens": round(output_tokens / assistant_count, 1), + "total_result_mb": round(row["total_result_bytes"] / 1024 / 1024, 2), + "large_result_count": row["large_result_count"], + "has_compaction": row["compaction_count"] > 0, + "read_to_edit_ratio": round(read_count / edit_count, 2), + "files_read_multiple_times": files_read_multiple.get(row["session_id"], 0), + }, + "totals": { + "events": total_events, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + }, + } + ) + + return { + "days": days, + "project": project, + "session_count": len(sessions), + "sessions": sessions, + } diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index de02e05..9a93e2f 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -806,6 +806,111 @@ def get_bus_events( return {"status": "ok", **result} +# Issue #69: Compaction detection and context efficiency tools + + +@mcp.tool() +def get_compaction_events( + days: int = 7, + session_id: str | None = None, +) -> dict: + """List compaction events (context resets) across sessions. + + Compactions occur when Claude's context window fills and is summarized. + This helps identify sessions that hit context limits. + + Args: + days: Number of days to analyze (default: 7) + session_id: Filter to specific session + + Returns: + List of compaction events with timestamps and session info + """ + queries.ensure_fresh_data(storage, days=days) + result = queries.get_compaction_events(storage, days=days, session_id=session_id) + return {"status": "ok", **result} + + +@mcp.tool() +def get_pre_compaction_events( + session_id: str, + compaction_timestamp: str, + limit: int = 50, +) -> dict: + """Get events that occurred before a compaction event. + + Use this to understand what was happening in the session + leading up to a context reset. + + Args: + session_id: The session to analyze + compaction_timestamp: ISO timestamp of the compaction event + limit: Maximum events to return (default: 50) + + Returns: + Events before the compaction, ordered by timestamp descending (most recent first) + """ + queries.ensure_fresh_data(storage, days=7) + result = queries.get_pre_compaction_events( + storage, + session_id=session_id, + compaction_timestamp=compaction_timestamp, + limit=limit, + ) + return {"status": "ok", **result} + + +@mcp.tool() +def get_large_tool_results( + days: int = 7, + min_size_kb: int = 10, + limit: int = 50, +) -> dict: + """Find tool results that consumed significant context space. + + Helps identify bloat patterns - large file reads, verbose command + outputs, or other operations that accelerate context exhaustion. + + Args: + days: Number of days to analyze (default: 7) + min_size_kb: Minimum result size in KB to include (default: 10) + limit: Maximum results to return (default: 50) + + Returns: + Large tool results with size, tool name, and parameters + """ + queries.ensure_fresh_data(storage, days=days) + result = queries.get_large_tool_results( + storage, days=days, min_size_kb=min_size_kb, limit=limit + ) + return {"status": "ok", **result} + + +@mcp.tool() +def get_session_efficiency( + days: int = 7, + project: str | None = None, +) -> dict: + """Analyze context efficiency and burn rate across sessions. + + Calculates metrics like: + - Total context bytes consumed per session + - Average result size + - Compaction count (context resets) + - Efficiency ratio (output/input bytes) + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + + Returns: + Session efficiency metrics sorted by total bytes consumed + """ + queries.ensure_fresh_data(storage, days=days) + result = queries.get_session_efficiency(storage, days=days, project=project) + return {"status": "ok", **result} + + def create_app(): """Create the ASGI app for uvicorn.""" # stateless_http=True allows resilience to server restarts diff --git a/src/session_analytics/storage.py b/src/session_analytics/storage.py index 93fae89..cc36a0a 100644 --- a/src/session_analytics/storage.py +++ b/src/session_analytics/storage.py @@ -81,6 +81,9 @@ class Event: is_sidechain: bool = False # True for agent/background work version: str | None = None # Claude Code version from entry + # Issue #69: Context efficiency tracking + result_size_bytes: int | None = None # Size of message_text in bytes + @dataclass class Session: @@ -171,7 +174,7 @@ class BusEvent: DEFAULT_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" # Schema version for migrations -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 9 # Migration functions: dict of version -> (migration_name, migration_func) # Each migration upgrades FROM version-1 TO version @@ -478,6 +481,27 @@ def migrate_v8(conn): """) +@migration(9, "add_result_size_bytes") +def migrate_v9(conn): + """Add result_size_bytes column for context efficiency tracking. + + Issue #69: Tracks the byte size of message_text for all entry types to enable: + - Compaction event analysis (large summaries indicate context overflow) + - Large tool result detection (bloat identification) + - Session burn rate calculation (context consumption per event) + + Compaction events are detected during ingestion by checking summary text + for "continued from a previous conversation" marker. Such summaries get + entry_type='compaction' instead of 'summary'. + + Column will be NULL for existing data until re-ingestion. + """ + existing_cols = {row[1] for row in conn.execute("PRAGMA table_info(events)")} + + if "result_size_bytes" not in existing_cols: + conn.execute("ALTER TABLE events ADD COLUMN result_size_bytes INTEGER") + + class SQLiteStorage: """SQLite-backed storage for session analytics.""" @@ -627,6 +651,9 @@ def _init_db(self): is_sidechain INTEGER DEFAULT 0, version TEXT, + -- Issue #69: Context efficiency tracking + result_size_bytes INTEGER, + UNIQUE(session_id, uuid) ) """) @@ -822,8 +849,8 @@ def add_event(self, event: Event) -> Event: command, command_args, file_path, skill_name, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, git_branch, cwd, user_message_text, message_text, exit_code, - parent_uuid, agent_id, is_sidechain, version - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + parent_uuid, agent_id, is_sidechain, version, result_size_bytes + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( event.uuid, @@ -853,6 +880,7 @@ def add_event(self, event: Event) -> Event: event.agent_id, 1 if event.is_sidechain else 0, event.version, + event.result_size_bytes, ), ) event.id = cursor.lastrowid @@ -869,8 +897,8 @@ def add_events_batch(self, events: list[Event]) -> int: command, command_args, file_path, skill_name, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, git_branch, cwd, user_message_text, message_text, exit_code, - parent_uuid, agent_id, is_sidechain, version - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + parent_uuid, agent_id, is_sidechain, version, result_size_bytes + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( @@ -901,6 +929,7 @@ def add_events_batch(self, events: list[Event]) -> int: e.agent_id, 1 if e.is_sidechain else 0, e.version, + e.result_size_bytes, ) for e in events ], @@ -999,6 +1028,8 @@ def get_col(name: str, default=None): agent_id=get_col("agent_id"), is_sidechain=bool(get_col("is_sidechain", 0)), version=get_col("version"), + # Issue #69: Context efficiency tracking + result_size_bytes=get_col("result_size_bytes"), ) # Session operations diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 2670a0d..b4a9eb2 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -7,6 +7,8 @@ import pytest from session_analytics.ingest import ( + calculate_result_size, + detect_compaction, extract_command_name, extract_text_from_content, extract_tool_result_content, @@ -1398,3 +1400,78 @@ def test_tool_result_gets_agent_fields(self): assert events[0].agent_id == "d345678" assert events[0].is_sidechain is True assert events[0].version == "2.0.76" + + +class TestCalculateResultSize: + """Tests for calculate_result_size() helper.""" + + def test_none_returns_none(self): + """None input returns None.""" + assert calculate_result_size(None) is None + + def test_empty_string_returns_zero(self): + """Empty string returns 0 bytes.""" + assert calculate_result_size("") == 0 + + def test_ascii_string(self): + """ASCII string returns character count (1 byte each).""" + assert calculate_result_size("hello") == 5 + + def test_unicode_string_returns_byte_count(self): + """Unicode string returns UTF-8 byte count, not char count.""" + # é is 1 char but 2 bytes in UTF-8 + assert calculate_result_size("é") == 2 + # 日本語 is 3 chars but 9 bytes in UTF-8 + assert calculate_result_size("日本語") == 9 + + def test_multiline_string(self): + """Multiline string counts newlines as 1 byte each.""" + text = "line1\nline2\nline3" + assert calculate_result_size(text) == len(text.encode("utf-8")) + + def test_large_string(self): + """Large strings are calculated correctly.""" + text = "x" * 10000 + assert calculate_result_size(text) == 10000 + + +class TestDetectCompaction: + """Tests for detect_compaction() helper.""" + + def test_none_returns_false(self): + """None input returns False.""" + assert detect_compaction(None) is False + + def test_empty_string_returns_false(self): + """Empty string returns False.""" + assert detect_compaction("") is False + + def test_marker_phrase_detected(self): + """Text containing the marker phrase returns True.""" + text = "This summary continued from a previous conversation that ran out of context." + assert detect_compaction(text) is True + + def test_case_insensitive_detection(self): + """Detection is case-insensitive.""" + assert detect_compaction("CONTINUED FROM A PREVIOUS CONVERSATION") is True + assert detect_compaction("Continued From A Previous Conversation") is True + + def test_marker_at_different_positions(self): + """Marker anywhere in text is detected.""" + # At start + assert detect_compaction("continued from a previous conversation. More text.") is True + # In middle + assert detect_compaction("Summary: continued from a previous conversation here.") is True + # At end + assert detect_compaction("...continued from a previous conversation") is True + + def test_partial_match_not_detected(self): + """Similar but incomplete phrases don't match.""" + assert detect_compaction("continued from previous") is False + assert detect_compaction("from a previous conversation") is False + assert detect_compaction("continued a conversation") is False + + def test_regular_summary_not_detected(self): + """Regular summaries without the marker return False.""" + text = "This is a summary of the recent conversation about implementing a feature." + assert detect_compaction(text) is False diff --git a/tests/test_queries.py b/tests/test_queries.py index 7b06059..8f7cd1d 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -2331,3 +2331,308 @@ def test_days_filter(self, storage): # 30 days should get both errors result_30 = query_error_details(storage, days=30) assert result_30["total_errors"] == 2 + + +# Issue #69: Compaction and context efficiency tests + + +class TestGetCompactionEvents: + """Tests for get_compaction_events().""" + + def test_returns_compaction_events(self, storage): + """Test that compaction events are returned.""" + from session_analytics.queries import get_compaction_events + + now = datetime.now() + events = [ + Event( + id=None, + uuid="c1", + timestamp=now - timedelta(hours=1), + session_id="s1", + project_path="-test", + entry_type="compaction", + message_text="Summary of previous conversation...", + result_size_bytes=1024, + ), + Event( + id=None, + uuid="c2", + timestamp=now - timedelta(hours=2), + session_id="s1", + project_path="-test", + entry_type="summary", + message_text="Regular summary", + result_size_bytes=512, + ), + ] + storage.add_events_batch(events) + + result = get_compaction_events(storage, days=7) + + assert result["compaction_count"] == 1 + assert len(result["compactions"]) == 1 + assert result["compactions"][0]["session_id"] == "s1" + + def test_filters_by_session_id(self, storage): + """Test session_id filter works.""" + from session_analytics.queries import get_compaction_events + + now = datetime.now() + events = [ + Event( + id=None, + uuid="c1", + timestamp=now - timedelta(hours=1), + session_id="s1", + entry_type="compaction", + ), + Event( + id=None, + uuid="c2", + timestamp=now - timedelta(hours=2), + session_id="s2", + entry_type="compaction", + ), + ] + storage.add_events_batch(events) + + result = get_compaction_events(storage, days=7, session_id="s1") + + assert result["compaction_count"] == 1 + assert result["compactions"][0]["session_id"] == "s1" + + +class TestGetPreCompactionEvents: + """Tests for get_pre_compaction_events().""" + + def test_returns_events_before_compaction(self, storage): + """Test that events before the compaction timestamp are returned.""" + from session_analytics.queries import get_pre_compaction_events + + now = datetime.now() + compaction_time = now - timedelta(hours=1) + events = [ + # Event before compaction + Event( + id=None, + uuid="e1", + timestamp=compaction_time - timedelta(minutes=10), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Read", + file_path="/test/file.py", + ), + # Another event before compaction + Event( + id=None, + uuid="e2", + timestamp=compaction_time - timedelta(minutes=5), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Edit", + file_path="/test/file.py", + ), + # Event after compaction (should not be included) + Event( + id=None, + uuid="e3", + timestamp=compaction_time + timedelta(minutes=5), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + ), + ] + storage.add_events_batch(events) + + result = get_pre_compaction_events( + storage, + session_id="s1", + compaction_timestamp=compaction_time.isoformat(), + limit=50, + ) + + assert result["event_count"] == 2 + assert len(result["events"]) == 2 + # DESC order - most recent first + assert result["events"][0]["tool"] == "Edit" + assert result["events"][1]["tool"] == "Read" + + def test_filters_by_session_id(self, storage): + """Test that only events from the specified session are returned.""" + from session_analytics.queries import get_pre_compaction_events + + now = datetime.now() + compaction_time = now - timedelta(hours=1) + events = [ + Event( + id=None, + uuid="e1", + timestamp=compaction_time - timedelta(minutes=10), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Read", + ), + Event( + id=None, + uuid="e2", + timestamp=compaction_time - timedelta(minutes=10), + session_id="s2", + project_path="-test", + entry_type="tool_use", + tool_name="Write", + ), + ] + storage.add_events_batch(events) + + result = get_pre_compaction_events( + storage, + session_id="s1", + compaction_timestamp=compaction_time.isoformat(), + limit=50, + ) + + assert result["event_count"] == 1 + assert result["events"][0]["tool"] == "Read" + + def test_respects_limit_parameter(self, storage): + """Test that the limit parameter is respected.""" + from session_analytics.queries import get_pre_compaction_events + + now = datetime.now() + compaction_time = now - timedelta(hours=1) + events = [ + Event( + id=None, + uuid=f"e{i}", + timestamp=compaction_time - timedelta(minutes=i), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name=f"Tool{i}", + ) + for i in range(10) + ] + storage.add_events_batch(events) + + result = get_pre_compaction_events( + storage, + session_id="s1", + compaction_timestamp=compaction_time.isoformat(), + limit=3, + ) + + assert result["event_count"] == 3 + assert len(result["events"]) == 3 + + +class TestGetLargeToolResults: + """Tests for get_large_tool_results().""" + + def test_returns_large_results(self, storage): + """Test that large tool results are returned.""" + from session_analytics.queries import get_large_tool_results + + now = datetime.now() + # Need both tool_use and tool_result with matching tool_id for the JOIN + events = [ + # Large result + Event( + id=None, + uuid="lr1-use", + timestamp=now - timedelta(hours=1), + session_id="s1", + entry_type="tool_use", + tool_name="Read", + tool_id="tool-lr1", + file_path="/large/file.py", + ), + Event( + id=None, + uuid="lr1-result", + timestamp=now - timedelta(hours=1, seconds=1), + session_id="s1", + entry_type="tool_result", + tool_id="tool-lr1", + result_size_bytes=50 * 1024, # 50KB + ), + # Small result (below threshold) + Event( + id=None, + uuid="lr2-use", + timestamp=now - timedelta(hours=2), + session_id="s1", + entry_type="tool_use", + tool_name="Bash", + tool_id="tool-lr2", + command="cat", + ), + Event( + id=None, + uuid="lr2-result", + timestamp=now - timedelta(hours=2, seconds=1), + session_id="s1", + entry_type="tool_result", + tool_id="tool-lr2", + result_size_bytes=5 * 1024, # 5KB - below threshold + ), + ] + storage.add_events_batch(events) + + result = get_large_tool_results(storage, days=7, min_size_kb=10) + + assert result["result_count"] == 1 + assert len(result["large_results"]) == 1 + assert result["large_results"][0]["tool"] == "Read" + assert result["large_results"][0]["size_kb"] == 50.0 + + +class TestGetSessionEfficiency: + """Tests for get_session_efficiency().""" + + def test_returns_efficiency_metrics(self, storage): + """Test that efficiency metrics are returned.""" + from session_analytics.queries import get_session_efficiency + + now = datetime.now() + # Need 10+ events to pass HAVING clause in query + events = [] + for i in range(12): + events.append( + Event( + id=None, + uuid=f"e{i}", + timestamp=now - timedelta(hours=1, minutes=i), + session_id="s1", + entry_type="tool_result" if i % 2 == 0 else "assistant", + result_size_bytes=1000, + input_tokens=100 if i % 2 == 1 else None, + output_tokens=50 if i % 2 == 1 else None, + ) + ) + # Add a compaction event + events.append( + Event( + id=None, + uuid="compact", + timestamp=now - timedelta(hours=2), + session_id="s1", + entry_type="compaction", + result_size_bytes=5000, + ) + ) + storage.add_events_batch(events) + + result = get_session_efficiency(storage, days=7) + + assert "sessions" in result + assert len(result["sessions"]) >= 1 + # Find our session + s1_data = next((s for s in result["sessions"] if s["session_id"] == "s1"), None) + assert s1_data is not None + assert s1_data["efficiency_signals"]["compaction_count"] == 1 + assert s1_data["efficiency_signals"]["has_compaction"] is True diff --git a/tests/test_storage.py b/tests/test_storage.py index b792760..489ca10 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -1063,3 +1063,61 @@ def test_index_on_tool_id(self, storage): rows = storage.execute_query("PRAGMA index_list(events)") indexes = {row[1] for row in rows} assert "idx_events_tool_id" in indexes + + +# Issue #69: Context efficiency field tests + + +class TestResultSizeBytes: + """Tests for result_size_bytes field (Issue #69).""" + + def test_event_with_result_size_bytes(self, storage): + """Test adding event with result_size_bytes field.""" + event = Event( + id=None, + uuid="size-test-1", + timestamp=datetime.now(), + session_id="s1", + entry_type="tool_result", + result_size_bytes=1024, + ) + result = storage.add_event(event) + assert result.id is not None + + stored = storage.get_events_in_range(session_id="s1") + assert len(stored) == 1 + assert stored[0].result_size_bytes == 1024 + + def test_batch_add_with_result_size_bytes(self, storage): + """Test batch adding events with result_size_bytes.""" + events = [ + Event( + id=None, + uuid="batch-size-1", + timestamp=datetime.now(), + session_id="s1", + entry_type="tool_result", + result_size_bytes=500, + ), + Event( + id=None, + uuid="batch-size-2", + timestamp=datetime.now(), + session_id="s1", + entry_type="assistant", + result_size_bytes=2000, + ), + ] + count = storage.add_events_batch(events) + assert count == 2 + + stored = storage.get_events_in_range(session_id="s1") + sizes = {e.uuid: e.result_size_bytes for e in stored} + assert sizes["batch-size-1"] == 500 + assert sizes["batch-size-2"] == 2000 + + def test_result_size_bytes_column_exists(self, storage): + """Verify that result_size_bytes column exists in events table.""" + rows = storage.execute_query("PRAGMA table_info(events)") + columns = {row[1] for row in rows} + assert "result_size_bytes" in columns