diff --git a/src/session_analytics/cli.py b/src/session_analytics/cli.py index 05006b6..e3b7e79 100644 --- a/src/session_analytics/cli.py +++ b/src/session_analytics/cli.py @@ -22,6 +22,7 @@ sample_sequences, ) from session_analytics.queries import ( + analyze_pre_compaction_patterns, classify_sessions, detect_parallel_sessions, find_related_sessions, @@ -652,6 +653,78 @@ def _format_compactions(data: dict) -> list[str]: return lines +# Issue #81: Aggregate compactions formatter +@_register_formatter( + lambda d: d.get("aggregate") is True and "sessions" in d and "total_compaction_count" in d +) +def _format_compactions_aggregate(data: dict) -> list[str]: + total_compactions = data.get("total_compaction_count", 0) + total_sessions = data.get("total_sessions_with_compactions", 0) + shown_sessions = data.get("session_count", 0) + + lines = [ + f"Compaction summary by session - last {data.get('days', 7)} days", + "", + f"Total compactions: {total_compactions}", + f"Sessions with compactions: {total_sessions}", + ] + + if total_sessions > shown_sessions: + lines.append(f"Showing {shown_sessions} of {total_sessions} sessions") + lines.append("") + + if data.get("sessions"): + lines.append("Sessions ranked by compaction count:") + for s in data["sessions"][:15]: + lines.append( + f" {s['session_id'][:8]}... - {s['compaction_count']} compactions " + f"({s['total_summary_kb']:.0f}KB summaries)" + ) + return lines + + +# Issue #81: Pre-compaction patterns formatter +@_register_formatter(lambda d: "compactions_analyzed" in d and "patterns" in d) +def _format_pre_compaction_patterns(data: dict) -> list[str]: + lines = [ + f"Pre-compaction pattern analysis - last {data.get('days', 7)} days", + "", + f"Compactions analyzed: {data.get('compactions_analyzed', 0)}", + f"Events analyzed before each: {data.get('events_before', 50)}", + "", + ] + + patterns = data.get("patterns", {}) + if patterns: + lines.append("Detected patterns:") + lines.append(f" Avg consecutive reads: {patterns.get('avg_consecutive_reads', 0):.1f}") + lines.append(f" Avg files re-read: {patterns.get('avg_files_read_multiple_times', 0):.1f}") + lines.append(f" Avg large results (>10KB): {patterns.get('avg_large_results', 0):.1f}") + lines.append("") + + if patterns.get("tool_distribution"): + lines.append("Tool distribution before compactions:") + for t in patterns["tool_distribution"][:5]: + lines.append(f" {t['tool']}: {t['count']}") + lines.append("") + + if patterns.get("top_reread_files"): + lines.append("Most frequently re-read files:") + for f in patterns["top_reread_files"][:5]: + lines.append(f" {f['file']}: {f['read_count']}x") + lines.append("") + + recommendations = data.get("recommendations", []) + if recommendations: + lines.append("Recommendations:") + for r in recommendations: + lines.append(f" - {r}") + elif data.get("compactions_analyzed", 0) > 0: + lines.append("No antipatterns detected - context efficiency looks healthy.") + + return lines + + @_register_formatter(lambda d: "compaction_timestamp" in d and "events" in d and "event_count" in d) def _format_pre_compaction(data: dict) -> list[str]: lines = [ @@ -1163,6 +1236,7 @@ def cmd_compactions(args): days=args.days, session_id=getattr(args, "session_id", None), limit=getattr(args, "limit", 50), + aggregate=getattr(args, "aggregate", False), ) print(format_output(result, args.json)) @@ -1179,6 +1253,18 @@ def cmd_pre_compaction(args): print(format_output(result, args.json)) +def cmd_pre_compaction_patterns(args): + """Analyze patterns in events leading up to compactions.""" + storage = SQLiteStorage() + result = analyze_pre_compaction_patterns( + storage, + days=args.days, + events_before=args.events_before, + limit=args.limit, + ) + print(format_output(result, args.json)) + + def cmd_large_results(args): """Show large tool results that consume context space.""" storage = SQLiteStorage() @@ -1270,6 +1356,9 @@ def cmd_benchmark(args): from session_analytics.patterns import ( sample_sequences as patterns_sample_sequences, ) + from session_analytics.queries import ( + analyze_pre_compaction_patterns as queries_analyze_pre_compaction_patterns, + ) from session_analytics.queries import ( classify_sessions as queries_classify_sessions, ) @@ -1373,10 +1462,17 @@ def cmd_benchmark(args): "get_bus_events": lambda: queries_query_bus_events(storage, days=7, limit=10), # Issue #69: Compaction and efficiency tools "get_compaction_events": lambda: queries_get_compaction_events(storage, days=7), + "get_compaction_events_agg": lambda: queries_get_compaction_events( + storage, days=7, aggregate=True + ), "get_large_tool_results": lambda: queries_get_large_tool_results( storage, days=7, min_size_kb=10, limit=10 ), "get_session_efficiency": lambda: queries_get_session_efficiency(storage, days=7), + # Issue #81: Pre-compaction pattern analysis + "analyze_pre_compaction_patterns": lambda: queries_analyze_pre_compaction_patterns( + storage, days=7 + ), } # Skipped tools (require specific data or modify DB): @@ -1682,6 +1778,9 @@ def main(): sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") sub.add_argument("--session-id", help="Filter to specific session ID") sub.add_argument("--limit", type=int, default=50, help="Max events to return (default: 50)") + sub.add_argument( + "--aggregate", action="store_true", help="Group by session instead of individual events" + ) sub.set_defaults(func=cmd_compactions) # pre-compaction @@ -1691,6 +1790,22 @@ def main(): sub.add_argument("--limit", type=int, default=50, help="Max events to return (default: 50)") sub.set_defaults(func=cmd_pre_compaction) + # pre-compaction-patterns (Issue #81) + sub = subparsers.add_parser( + "pre-compaction-patterns", help="Analyze patterns in events before compactions" + ) + sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") + sub.add_argument( + "--events-before", + type=int, + default=50, + help="Events to analyze before each compaction (default: 50)", + ) + sub.add_argument( + "--limit", type=int, default=20, help="Max compactions to analyze (default: 20)" + ) + sub.set_defaults(func=cmd_pre_compaction_patterns) + # large-results sub = subparsers.add_parser( "large-results", help="Show large tool results consuming context space" diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md index d8c988d..8a8adc5 100644 --- a/src/session_analytics/guide.md +++ b/src/session_analytics/guide.md @@ -131,8 +131,9 @@ Returns both core metrics (`events`, `sessions`, `errors`, `tokens`) and `effici | Tool | Purpose | |------|---------| -| `get_compaction_events(days?, session_id?, limit?)` | List compaction events (context resets) | +| `get_compaction_events(days?, session_id?, limit?, aggregate?)` | List compaction events (context resets) | | `get_pre_compaction_events(session_id, compaction_timestamp, limit?)` | Events before a compaction for analysis | +| `analyze_pre_compaction_patterns(days?, events_before?, limit?)` | Aggregated patterns before compactions (RFC #81) | | `get_large_tool_results(days?, min_size_kb?, limit?)` | Find tool results consuming context space | | `get_session_efficiency(days?, project?, limit?)` | Session efficiency metrics and burn rate | @@ -231,10 +232,12 @@ analyze_trends() → "Usage is increasing/decreasing" ### Workflow: Context Efficiency ``` -get_compaction_events() → "When did context resets happen?" -get_session_efficiency() → "Which sessions burn context fastest?" -get_large_tool_results() → "What operations consume the most space?" -get_pre_compaction_events() → "What led up to a specific reset?" +get_compaction_events() → "When did context resets happen?" +get_compaction_events(aggregate=True) → "Which sessions had most compactions?" +analyze_pre_compaction_patterns() → "What patterns precede compactions?" (RFC #81) +get_session_efficiency() → "Which sessions burn context fastest?" +get_large_tool_results() → "What operations consume the most space?" +get_pre_compaction_events() → "What led up to a specific reset?" ``` ## Reference diff --git a/src/session_analytics/queries.py b/src/session_analytics/queries.py index ae5a974..2d1ed2b 100644 --- a/src/session_analytics/queries.py +++ b/src/session_analytics/queries.py @@ -2099,6 +2099,7 @@ def get_compaction_events( days: int = 7, session_id: str | None = None, limit: int = 50, + aggregate: bool = False, ) -> dict: """List compaction events where conversation history was truncated. @@ -2110,9 +2111,10 @@ def get_compaction_events( days: Number of days to analyze (default: 7) session_id: Optional filter for specific session limit: Maximum events to return (default: 50) + aggregate: If True, group by session with counts instead of individual events Returns: - Dict with compaction events and their timestamps + Dict with compaction events and their timestamps (or session aggregates if aggregate=True) """ cutoff = get_cutoff(days=days) @@ -2132,7 +2134,63 @@ def get_compaction_events( ) total_count = count_row[0]["total"] if count_row else 0 - # Then get limited results + # Issue #81: Aggregate mode groups by session + if aggregate: + query_params = list(params) + limit_clause = "" + if limit > 0: + limit_clause = "LIMIT ?" + query_params.append(limit) + + rows = storage.execute_query( + f""" + SELECT + session_id, + project_path, + COUNT(*) as compaction_count, + MIN(timestamp) as first_compaction, + MAX(timestamp) as last_compaction, + SUM(result_size_bytes) as total_summary_bytes + FROM events + WHERE {where_clause} + GROUP BY session_id + ORDER BY compaction_count DESC + {limit_clause} + """, + tuple(query_params), + ) + + sessions = [ + { + "session_id": row["session_id"], + "project": row["project_path"], + "compaction_count": row["compaction_count"], + "first_compaction": _format_timestamp(row["first_compaction"]), + "last_compaction": _format_timestamp(row["last_compaction"]), + "total_summary_kb": round((row["total_summary_bytes"] or 0) / 1024, 1), + } + for row in rows + ] + + # Count unique sessions + session_count_row = storage.execute_query( + f"SELECT COUNT(DISTINCT session_id) as count FROM events WHERE {where_clause}", + tuple(params), + ) + total_sessions = session_count_row[0]["count"] if session_count_row else 0 + + return { + "days": days, + "session_id": session_id, + "limit": limit, + "aggregate": True, + "total_compaction_count": total_count, + "total_sessions_with_compactions": total_sessions, + "session_count": len(sessions), + "sessions": sessions, + } + + # Non-aggregate mode: return individual compaction events query_params = list(params) limit_clause = "" if limit > 0: @@ -2170,6 +2228,7 @@ def get_compaction_events( "days": days, "session_id": session_id, "limit": limit, + "aggregate": False, "total_compaction_count": total_count, "compaction_count": len(compactions), "compactions": compactions, @@ -2240,6 +2299,197 @@ def get_pre_compaction_events( } +def analyze_pre_compaction_patterns( + storage: SQLiteStorage, + days: int = 7, + events_before: int = 50, + limit: int = 20, +) -> dict: + """Analyze patterns in events leading up to compactions. + + RFC #81: Identifies antipatterns that accelerate context exhaustion: + - Consecutive reads without edits (exploration without action) + - Files read multiple times before compaction + - Large tool results that bloated context + - Tool distribution before compaction + + Args: + storage: Storage instance + days: Number of days to analyze (default: 7) + events_before: Events to analyze before each compaction (default: 50) + limit: Max compactions to analyze (default: 20) + + Returns: + Dict with aggregated patterns across analyzed compactions + """ + cutoff = get_cutoff(days=days) + + # Get recent compactions + compactions = storage.execute_query( + """ + SELECT session_id, timestamp + FROM events + WHERE timestamp >= ? + AND entry_type = 'compaction' + ORDER BY timestamp DESC + LIMIT ? + """, + (cutoff, limit), + ) + + if not compactions: + return { + "days": days, + "compactions_analyzed": 0, + "patterns": {}, + "recommendations": [], + } + + # Analyze patterns across all compactions + total_consecutive_reads = 0 + total_files_read_multiple = 0 + total_large_results = 0 + tool_counts: dict[str, int] = {} + file_read_counts: dict[str, int] = {} + large_results_by_tool: dict[str, int] = {} + + for compaction in compactions: + session_id = compaction["session_id"] + compact_time = compaction["timestamp"] + + # Get events before this compaction + events = storage.execute_query( + """ + SELECT + entry_type, + tool_name, + file_path, + result_size_bytes + FROM events + WHERE session_id = ? + AND timestamp < ? + ORDER BY timestamp DESC + LIMIT ? + """, + (session_id, compact_time, events_before), + ) + + # Count consecutive reads (no edit between reads) + consecutive_reads = 0 + max_consecutive_reads = 0 + for event in events: + if event["tool_name"] == "Read": + consecutive_reads += 1 + max_consecutive_reads = max(max_consecutive_reads, consecutive_reads) + elif event["tool_name"] == "Edit": + consecutive_reads = 0 + total_consecutive_reads += max_consecutive_reads + + # Count files read multiple times + session_file_counts: dict[str, int] = {} + for event in events: + if event["tool_name"] == "Read" and event["file_path"]: + session_file_counts[event["file_path"]] = ( + session_file_counts.get(event["file_path"], 0) + 1 + ) + multi_read_files = sum(1 for c in session_file_counts.values() if c > 1) + total_files_read_multiple += multi_read_files + + # Aggregate file reads across all compactions + for f, c in session_file_counts.items(): + file_read_counts[f] = file_read_counts.get(f, 0) + c + + # Count large results (>10KB) + for event in events: + size = event["result_size_bytes"] or 0 + if size > 10240: + total_large_results += 1 + tool = event["tool_name"] or "unknown" + large_results_by_tool[tool] = large_results_by_tool.get(tool, 0) + 1 + + # Tool distribution + for event in events: + if event["tool_name"]: + tool_counts[event["tool_name"]] = tool_counts.get(event["tool_name"], 0) + 1 + + # Calculate averages + compactions_analyzed = len(compactions) + avg_consecutive_reads = total_consecutive_reads / compactions_analyzed + avg_files_read_multiple = total_files_read_multiple / compactions_analyzed + avg_large_results = total_large_results / compactions_analyzed + + # Top files read multiple times + top_reread_files = sorted( + [(f, c) for f, c in file_read_counts.items() if c > 1], + key=lambda x: x[1], + reverse=True, + )[:10] + + # Tool distribution sorted by count + tool_distribution = sorted( + tool_counts.items(), + key=lambda x: x[1], + reverse=True, + ) + + # Generate recommendations based on findings + recommendations = [] + if avg_consecutive_reads > 5: + recommendations.append( + f"High consecutive reads ({avg_consecutive_reads:.1f} avg) - " + "consider reading fewer files or using grep to target specific content" + ) + if avg_files_read_multiple > 2: + recommendations.append( + f"Files re-read frequently ({avg_files_read_multiple:.1f} avg) - " + "consider keeping file content in context or summarizing key sections" + ) + if avg_large_results > 3: + recommendations.append( + f"Many large tool results ({avg_large_results:.1f} avg) - " + "use offset/limit parameters or grep to reduce result size" + ) + + # Check for Read-heavy tool distribution + read_count = tool_counts.get("Read", 0) + edit_count = tool_counts.get("Edit", 0) + if edit_count == 0 and read_count > 10: + # All reads, no edits - pure exploration that consumed context + recommendations.append( + f"Pure exploration pattern ({read_count} reads, 0 edits) - " + "context consumed without productive editing" + ) + elif read_count > 0 and edit_count > 0: + read_edit_ratio = read_count / edit_count + if read_edit_ratio > 5: + recommendations.append( + f"High read:edit ratio ({read_edit_ratio:.1f}:1) - " + "may indicate over-exploration before action" + ) + + return { + "days": days, + "events_before": events_before, + "compactions_analyzed": compactions_analyzed, + "patterns": { + "avg_consecutive_reads": round(avg_consecutive_reads, 1), + "avg_files_read_multiple_times": round(avg_files_read_multiple, 1), + "avg_large_results": round(avg_large_results, 1), + "tool_distribution": [ + {"tool": tool, "count": count} for tool, count in tool_distribution + ], + "top_reread_files": [{"file": f, "read_count": c} for f, c in top_reread_files], + "large_results_by_tool": [ + {"tool": tool, "count": count} + for tool, count in sorted( + large_results_by_tool.items(), key=lambda x: x[1], reverse=True + ) + ], + }, + "recommendations": recommendations, + } + + def get_large_tool_results( storage: SQLiteStorage, days: int = 7, diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index ab6d314..b4ee586 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -843,6 +843,7 @@ def get_compaction_events( days: int = 7, session_id: str | None = None, limit: int = 50, + aggregate: bool = False, ) -> dict: """List compaction events (context resets) across sessions. @@ -853,12 +854,16 @@ def get_compaction_events( days: Number of days to analyze (default: 7) session_id: Filter to specific session limit: Maximum events to return (default: 50) + aggregate: If True, group by session with counts instead of individual events Returns: List of compaction events with timestamps and session info + (or session aggregates if aggregate=True) """ queries.ensure_fresh_data(storage, days=days) - result = queries.get_compaction_events(storage, days=days, session_id=session_id, limit=limit) + result = queries.get_compaction_events( + storage, days=days, session_id=session_id, limit=limit, aggregate=aggregate + ) return {"status": "ok", **result} @@ -891,6 +896,35 @@ def get_pre_compaction_events( return {"status": "ok", **result} +@mcp.tool() +def analyze_pre_compaction_patterns( + days: int = 7, + events_before: int = 50, + limit: int = 20, +) -> dict: + """Analyze patterns in events leading up to compactions. + + RFC #81: Identifies antipatterns that accelerate context exhaustion: + - Consecutive reads without edits (exploration without action) + - Files read multiple times before compaction + - Large tool results that bloated context + - Tool distribution before compaction + + Args: + days: Number of days to analyze (default: 7) + events_before: Events to analyze before each compaction (default: 50) + limit: Max compactions to analyze (default: 20) + + Returns: + Dict with aggregated patterns and recommendations + """ + queries.ensure_fresh_data(storage, days=days) + result = queries.analyze_pre_compaction_patterns( + storage, days=days, events_before=events_before, limit=limit + ) + return {"status": "ok", **result} + + @mcp.tool() def get_large_tool_results( days: int = 7, diff --git a/tests/test_queries.py b/tests/test_queries.py index 8f7cd1d..30e5407 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -2402,6 +2402,62 @@ def test_filters_by_session_id(self, storage): assert result["compaction_count"] == 1 assert result["compactions"][0]["session_id"] == "s1" + def test_aggregate_mode_groups_by_session(self, storage): + """Test that aggregate=True groups compactions by session.""" + from session_analytics.queries import get_compaction_events + + now = datetime.now() + events = [ + # Session 1: 3 compactions + Event( + id=None, + uuid="c1", + timestamp=now - timedelta(hours=1), + session_id="s1", + entry_type="compaction", + result_size_bytes=1024, + ), + Event( + id=None, + uuid="c2", + timestamp=now - timedelta(hours=2), + session_id="s1", + entry_type="compaction", + result_size_bytes=2048, + ), + Event( + id=None, + uuid="c3", + timestamp=now - timedelta(hours=3), + session_id="s1", + entry_type="compaction", + result_size_bytes=1024, + ), + # Session 2: 1 compaction + Event( + id=None, + uuid="c4", + timestamp=now - timedelta(hours=1), + session_id="s2", + entry_type="compaction", + result_size_bytes=512, + ), + ] + storage.add_events_batch(events) + + result = get_compaction_events(storage, days=7, aggregate=True) + + assert result["aggregate"] is True + assert result["total_compaction_count"] == 4 + assert result["total_sessions_with_compactions"] == 2 + assert result["session_count"] == 2 + assert len(result["sessions"]) == 2 + # Sessions sorted by compaction count descending + assert result["sessions"][0]["session_id"] == "s1" + assert result["sessions"][0]["compaction_count"] == 3 + assert result["sessions"][1]["session_id"] == "s2" + assert result["sessions"][1]["compaction_count"] == 1 + class TestGetPreCompactionEvents: """Tests for get_pre_compaction_events().""" @@ -2530,6 +2586,134 @@ def test_respects_limit_parameter(self, storage): assert len(result["events"]) == 3 +class TestAnalyzePreCompactionPatterns: + """Tests for analyze_pre_compaction_patterns().""" + + def test_returns_empty_when_no_compactions(self, storage): + """Test that empty result is returned when no compactions exist.""" + from session_analytics.queries import analyze_pre_compaction_patterns + + result = analyze_pre_compaction_patterns(storage, days=7) + + assert result["compactions_analyzed"] == 0 + assert result["patterns"] == {} + assert result["recommendations"] == [] + + def test_detects_consecutive_reads(self, storage): + """Test that consecutive reads are detected as a pattern.""" + from session_analytics.queries import analyze_pre_compaction_patterns + + now = datetime.now() + compaction_time = now - timedelta(hours=1) + events = [ + # Compaction event + Event( + id=None, + uuid="c1", + timestamp=compaction_time, + session_id="s1", + entry_type="compaction", + ), + # 10 consecutive reads before compaction (no edits) + *[ + Event( + id=None, + uuid=f"r{i}", + timestamp=compaction_time - timedelta(minutes=i + 1), + session_id="s1", + entry_type="tool_use", + tool_name="Read", + file_path=f"/test/file{i}.py", + ) + for i in range(10) + ], + ] + storage.add_events_batch(events) + + result = analyze_pre_compaction_patterns(storage, days=7) + + assert result["compactions_analyzed"] == 1 + assert result["patterns"]["avg_consecutive_reads"] >= 10 + assert "tool_distribution" in result["patterns"] + + def test_detects_files_read_multiple_times(self, storage): + """Test that files read multiple times are detected.""" + from session_analytics.queries import analyze_pre_compaction_patterns + + now = datetime.now() + compaction_time = now - timedelta(hours=1) + events = [ + # Compaction event + Event( + id=None, + uuid="c1", + timestamp=compaction_time, + session_id="s1", + entry_type="compaction", + ), + # Same file read 5 times + *[ + Event( + id=None, + uuid=f"r{i}", + timestamp=compaction_time - timedelta(minutes=i + 1), + session_id="s1", + entry_type="tool_use", + tool_name="Read", + file_path="/test/same_file.py", + ) + for i in range(5) + ], + ] + storage.add_events_batch(events) + + result = analyze_pre_compaction_patterns(storage, days=7) + + assert result["compactions_analyzed"] == 1 + assert result["patterns"]["avg_files_read_multiple_times"] >= 1 + # Check that the re-read file appears in top_reread_files + reread_files = [f["file"] for f in result["patterns"]["top_reread_files"]] + assert "/test/same_file.py" in reread_files + + def test_generates_recommendations_for_antipatterns(self, storage): + """Test that recommendations are generated when antipatterns detected.""" + from session_analytics.queries import analyze_pre_compaction_patterns + + now = datetime.now() + compaction_time = now - timedelta(hours=1) + events = [ + # Compaction event + Event( + id=None, + uuid="c1", + timestamp=compaction_time, + session_id="s1", + entry_type="compaction", + ), + # 15 consecutive reads (threshold is 5) + *[ + Event( + id=None, + uuid=f"r{i}", + timestamp=compaction_time - timedelta(minutes=i + 1), + session_id="s1", + entry_type="tool_use", + tool_name="Read", + file_path=f"/test/file{i}.py", + ) + for i in range(15) + ], + ] + storage.add_events_batch(events) + + result = analyze_pre_compaction_patterns(storage, days=7) + + # Should have recommendations about consecutive reads + assert len(result["recommendations"]) >= 1 + # At least one recommendation should mention reads + assert any("read" in r.lower() for r in result["recommendations"]) + + class TestGetLargeToolResults: """Tests for get_large_tool_results().""" diff --git a/tests/test_server.py b/tests/test_server.py index 7db5e72..286acd2 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -2,6 +2,7 @@ from session_analytics.server import ( analyze_failures, + analyze_pre_compaction_patterns, analyze_trends, classify_sessions, correlate_git_with_sessions, @@ -364,3 +365,44 @@ def test_classify_sessions_efficiency(): assert "files_read_multiple_times" in efficiency assert "burn_rate" in efficiency assert efficiency["burn_rate"] in ["high", "medium", "low"] + + +# Issue #81: Compaction aggregation and pre-compaction patterns + + +def test_get_compaction_events_aggregate(): + """Test that get_compaction_events aggregate mode returns session-level data.""" + result = get_compaction_events.fn(days=7, limit=10, aggregate=True) + assert result["status"] == "ok" + assert result["aggregate"] is True + assert "total_compaction_count" in result + assert "total_sessions_with_compactions" in result + assert "session_count" in result + assert "sessions" in result + assert isinstance(result["sessions"], list) + # If sessions exist, verify structure + if result["sessions"]: + session = result["sessions"][0] + assert "session_id" in session + assert "compaction_count" in session + assert "first_compaction" in session + assert "last_compaction" in session + assert "total_summary_kb" in session + + +def test_analyze_pre_compaction_patterns(): + """Test that analyze_pre_compaction_patterns returns pattern data.""" + result = analyze_pre_compaction_patterns.fn(days=7, events_before=50, limit=20) + assert result["status"] == "ok" + assert "compactions_analyzed" in result + assert "patterns" in result + assert "recommendations" in result + assert isinstance(result["recommendations"], list) + # If patterns exist, verify structure + if result.get("compactions_analyzed", 0) > 0: + patterns = result["patterns"] + assert "avg_consecutive_reads" in patterns + assert "avg_files_read_multiple_times" in patterns + assert "avg_large_results" in patterns + assert "tool_distribution" in patterns + assert isinstance(patterns["tool_distribution"], list)