diff --git a/CLAUDE.md b/CLAUDE.md index 2cdca86..aabfd46 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -154,7 +154,7 @@ Do this: {"error_count": 5, "error_rate": 0.25, "has_rework": True, "commit_count": 0} ``` -## MCP Tools (27 total) +## MCP Tools (28 total) ### Status & Ingestion | Tool | Purpose | @@ -187,6 +187,11 @@ Do this: | `get_projects` | Activity across all projects | | `get_mcp_usage` | MCP server and tool usage breakdown | +### Agent Activity +| Tool | Purpose | +|------|---------| +| `get_agent_activity` | Task subagent activity vs main session (RFC #41) | + ### Session Analysis | Tool | Purpose | |------|---------| @@ -227,7 +232,7 @@ Do this: > **Maintainer note**: This discovery flow is also documented in `src/session_analytics/guide.md` > (exposed as MCP resource `session-analytics://guide`). Keep both in sync when updating API docs. -## CLI Commands (26 total) +## CLI Commands (27 total) All commands support `--json` for machine-readable output: @@ -254,6 +259,9 @@ session-analytics-cli languages # Language distribution session-analytics-cli projects # Cross-project activity session-analytics-cli mcp-usage # MCP server/tool usage +# Agent Activity +session-analytics-cli agents # Task subagent vs main session (RFC #41) + # Session Analysis session-analytics-cli signals # Raw session metrics session-analytics-cli classify # Categorize sessions diff --git a/src/session_analytics/cli.py b/src/session_analytics/cli.py index afc84e2..4bd619a 100644 --- a/src/session_analytics/cli.py +++ b/src/session_analytics/cli.py @@ -24,6 +24,7 @@ find_related_sessions, get_handoff_context, get_user_journey, + query_agent_activity, query_commands, query_file_activity, query_languages, @@ -213,6 +214,43 @@ def _format_mcp_usage(data: dict) -> list[str]: return lines +@_register_formatter(lambda d: "agents" in d and "main_session" in d) +def _format_agent_activity(data: dict) -> list[str]: + """Format agent activity breakdown. + + RFC #41: Shows activity by Task subagent vs main session. + """ + summary = data.get("summary", {}) + lines = [ + "Agent activity breakdown (Task subagent vs main session)", + "", + f"Agents: {summary.get('agent_count', 0)}", + f"Agent tokens: {summary.get('total_agent_tokens', 0):,} ({summary.get('agent_token_percentage', 0)}%)", + f"Main tokens: {summary.get('total_main_tokens', 0):,}", + "", + ] + + # Main session stats + main = data.get("main_session") + if main: + lines.append("Main Session:") + lines.append(f" Events: {main['event_count']:,}") + lines.append(f" Tokens: {main['input_tokens']:,} in / {main['output_tokens']:,} out") + lines.append("") + + # Per-agent stats + for agent in data.get("agents", []): + lines.append(f"Agent {agent['agent_id']}:") + lines.append(f" Events: {agent['event_count']:,} ({agent['tool_use_count']:,} tool uses)") + lines.append(f" Tokens: {agent['input_tokens']:,} in / {agent['output_tokens']:,} out") + if agent.get("top_tools"): + tools_str = ", ".join(f"{t['tool']}:{t['count']}" for t in agent["top_tools"][:3]) + lines.append(f" Top tools: {tools_str}") + lines.append("") + + return lines + + @_register_formatter(lambda d: "samples" in d and "parsed_tools" in d) def _format_sample_sequences(data: dict) -> list[str]: lines = [ @@ -630,6 +668,16 @@ def cmd_mcp_usage(args): print(format_output(result, args.json)) +def cmd_agents(args): + """Show agent activity breakdown. + + RFC #41: Shows activity by Task subagent vs main session. + """ + storage = SQLiteStorage() + result = query_agent_activity(storage, days=args.days, project=args.project) + print(format_output(result, args.json)) + + def cmd_insights(args): """Show insights for /improve-workflow.""" storage = SQLiteStorage() @@ -1081,6 +1129,12 @@ def main(): sub.add_argument("--project", help="Project path filter") sub.set_defaults(func=cmd_mcp_usage) + # agents (RFC #41) + sub = subparsers.add_parser("agents", help="Show Task subagent activity breakdown") + sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") + sub.add_argument("--project", help="Project path filter") + sub.set_defaults(func=cmd_agents) + args = parser.parse_args() args.func(args) diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md index 5b97ecd..a9f8585 100644 --- a/src/session_analytics/guide.md +++ b/src/session_analytics/guide.md @@ -80,6 +80,12 @@ identify permission gaps. |------|---------| | `get_session_signals(days?, min_count?)` | Raw session metrics for LLM interpretation | +### Agent Activity + +| Tool | Purpose | +|------|---------| +| `get_agent_activity(days?, project?)` | Task subagent activity vs main session (RFC #41) | + ## Quick Start ### 1. Check status diff --git a/src/session_analytics/ingest.py b/src/session_analytics/ingest.py index 2f9fcfb..224e552 100644 --- a/src/session_analytics/ingest.py +++ b/src/session_analytics/ingest.py @@ -2,6 +2,7 @@ import json import logging +import re from datetime import datetime, timedelta from pathlib import Path @@ -60,6 +61,49 @@ def find_log_files( return [f for f, _ in files] +def extract_command_name(content: str | list) -> str | None: + """Extract command name from isMeta user message content. + + User-defined commands (e.g., /status-report) are expanded as user messages + with isMeta=true. The content starts with a markdown heading like "# Status Report". + + Returns: + Normalized command name (e.g., "status-report") or None if not detected. + """ + # Get the text content + text = None + if isinstance(content, str): + text = content + elif isinstance(content, list): + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text = item.get("text", "") + break + elif isinstance(item, str): + text = item + break + + if not text: + return None + + # Look for markdown heading at the start: "# Command Name" + match = re.match(r"^#\s+(.+?)(?:\n|$)", text.strip()) + if not match: + return None + + # Normalize: "Status Report" -> "status-report", "I'm Lost" -> "im-lost" + # Use regex to replace non-alphanumeric chars with hyphens, then clean up + command_name = re.sub(r"[^a-z0-9]+", "-", match.group(1).strip().lower()) + command_name = command_name.strip("-") # Remove leading/trailing hyphens + + # Filter out common non-command headings + non_commands = {"context", "instructions", "usage", "example", "examples", "notes"} + if command_name in non_commands: + return None + + return command_name + + def parse_tool_use(tool_use: dict) -> dict: """Extract normalized fields from a tool_use block. @@ -155,59 +199,75 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: cache_creation_tokens = usage.get("cache_creation_input_tokens") model = message.get("model") + # RFC #41: Extract agent tracking fields + agent_id = raw.get("agentId") # Present only in agent-*.jsonl files + is_sidechain = raw.get("isSidechain", False) # True for agent/background work + version = raw.get("version") # Claude Code version + events = [] # Handle assistant entries with tool_use blocks + # RFC #41: Always create assistant event with tokens, then tool_use events without tokens if entry_type == "assistant": content = message.get("content", []) tool_uses = [c for c in content if isinstance(c, dict) and c.get("type") == "tool_use"] - if tool_uses: - # Create an event for each tool_use - for tool_use in tool_uses: - parsed = parse_tool_use(tool_use) - events.append( - Event( - id=None, - uuid=f"{uuid}:{parsed['tool_id']}", # Unique per tool_use - timestamp=timestamp, - session_id=session_id, - project_path=project_path, - entry_type="tool_use", - tool_name=parsed["tool_name"], - tool_input_json=parsed["tool_input_json"], - tool_id=parsed["tool_id"], - is_error=False, - command=parsed["command"], - command_args=parsed["command_args"], - file_path=parsed["file_path"], - skill_name=parsed["skill_name"], - input_tokens=input_tokens, - output_tokens=output_tokens, - cache_read_tokens=cache_read_tokens, - cache_creation_tokens=cache_creation_tokens, - model=model, - git_branch=git_branch, - cwd=cwd, - ) - ) - else: - # Assistant message without tools + # ALWAYS create assistant event with tokens (fixes token duplication) + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="assistant", + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read_tokens, + cache_creation_tokens=cache_creation_tokens, + model=model, + git_branch=git_branch, + cwd=cwd, + # RFC #41: Agent tracking fields + parent_uuid=None, # Assistant events have no parent + agent_id=agent_id, + is_sidechain=is_sidechain, + version=version, + ) + ) + + # Create tool_use events WITHOUT tokens, linked via parent_uuid + for tool_use in tool_uses: + parsed = parse_tool_use(tool_use) events.append( Event( id=None, - uuid=uuid, + uuid=f"{uuid}:{parsed['tool_id']}", # Unique per tool_use timestamp=timestamp, session_id=session_id, project_path=project_path, - entry_type="assistant", - input_tokens=input_tokens, - output_tokens=output_tokens, - cache_read_tokens=cache_read_tokens, - cache_creation_tokens=cache_creation_tokens, + entry_type="tool_use", + tool_name=parsed["tool_name"], + tool_input_json=parsed["tool_input_json"], + tool_id=parsed["tool_id"], + is_error=False, + command=parsed["command"], + command_args=parsed["command_args"], + file_path=parsed["file_path"], + skill_name=parsed["skill_name"], + # RFC #41: NO tokens on tool_use - they're on the parent assistant + input_tokens=None, + output_tokens=None, + cache_read_tokens=None, + cache_creation_tokens=None, model=model, git_branch=git_branch, cwd=cwd, + # RFC #41: Link to parent assistant event + parent_uuid=uuid, + agent_id=agent_id, + is_sidechain=is_sidechain, + version=version, ) ) @@ -230,6 +290,11 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: if text_parts: user_message_text = " ".join(text_parts)[:USER_MESSAGE_MAX_LENGTH] + # Extract command name from isMeta user messages (slash command expansions) + # e.g., /status-report expands to a user message starting with "# Status Report" + is_meta = raw.get("isMeta", False) + command_name = extract_command_name(content) if is_meta else None + # Check if content is a list with tool_result blocks if isinstance(content, list): tool_results = [ @@ -251,6 +316,10 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: is_error=is_error, git_branch=git_branch, cwd=cwd, + # RFC #41: Agent tracking fields + agent_id=agent_id, + is_sidechain=is_sidechain, + version=version, ) ) else: @@ -262,10 +331,15 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: timestamp=timestamp, session_id=session_id, project_path=project_path, - entry_type="user", + entry_type="command" if command_name else "user", + skill_name=command_name, # Reuse skill_name for command tracking user_message_text=user_message_text, git_branch=git_branch, cwd=cwd, + # RFC #41: Agent tracking fields + agent_id=agent_id, + is_sidechain=is_sidechain, + version=version, ) ) else: @@ -277,10 +351,15 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: timestamp=timestamp, session_id=session_id, project_path=project_path, - entry_type="user", + entry_type="command" if command_name else "user", + skill_name=command_name, # Reuse skill_name for command tracking user_message_text=user_message_text, git_branch=git_branch, cwd=cwd, + # RFC #41: Agent tracking fields + agent_id=agent_id, + is_sidechain=is_sidechain, + version=version, ) ) @@ -294,6 +373,10 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]: session_id=session_id if session_id else "unknown", project_path=project_path, entry_type="summary", + # RFC #41: Agent tracking fields + agent_id=agent_id, + is_sidechain=is_sidechain, + version=version, ) ) diff --git a/src/session_analytics/queries.py b/src/session_analytics/queries.py index 258061a..57a041a 100644 --- a/src/session_analytics/queries.py +++ b/src/session_analytics/queries.py @@ -161,12 +161,27 @@ def query_tool_frequency( tools = [{"tool": row["tool_name"], "count": row["count"]} for row in rows] + # Get command count (slash commands from ~/.claude/commands) + # These are tracked separately as entry_type='command', not tool_name + cmd_where, cmd_params = build_where_clause( + cutoff=cutoff, + project=project, + extra_conditions=["entry_type = 'command'"], + ) + cmd_rows = storage.execute_query( + f"SELECT COUNT(*) as count FROM events WHERE {cmd_where}", + cmd_params, + ) + command_count = cmd_rows[0]["count"] if cmd_rows else 0 + # Add breakdowns if expand=True + command_breakdown = [] if expand: # Build breakdown queries with same filters skill_breakdown = _get_skill_breakdown(storage, cutoff, project) task_breakdown = _get_task_breakdown(storage, cutoff, project) bash_breakdown = _get_bash_breakdown(storage, cutoff, project) + command_breakdown = _get_command_breakdown(storage, cutoff, project) # Attach breakdowns to respective tools for tool in tools: @@ -177,6 +192,20 @@ def query_tool_frequency( elif tool["tool"] == "Bash" and bash_breakdown: tool["breakdown"] = bash_breakdown + # Insert Command entry in sorted position (by count) + if command_count > 0: + command_entry = {"tool": "Command", "count": command_count} + if command_breakdown: + command_entry["breakdown"] = command_breakdown + # Find insertion point to maintain sorted order + insert_idx = 0 + for i, t in enumerate(tools): + if t["count"] < command_count: + insert_idx = i + break + insert_idx = i + 1 + tools.insert(insert_idx, command_entry) + return { "days": days, "project": project, @@ -211,6 +240,32 @@ def _get_skill_breakdown( return [{"name": row["skill_name"], "count": row["count"]} for row in rows] +def _get_command_breakdown( + storage: SQLiteStorage, + cutoff: datetime, + project: str | None = None, +) -> list[dict]: + """Get Command usage breakdown by command name (slash commands from ~/.claude/commands).""" + where_clause, params = build_where_clause( + cutoff=cutoff, + project=project, + extra_conditions=["entry_type = 'command'", "skill_name IS NOT NULL"], + ) + + rows = storage.execute_query( + f""" + SELECT skill_name as command_name, COUNT(*) as count + FROM events + WHERE {where_clause} + GROUP BY skill_name + ORDER BY count DESC + """, + params, + ) + + return [{"name": row["command_name"], "count": row["count"]} for row in rows] + + def _get_task_breakdown( storage: SQLiteStorage, cutoff: datetime, @@ -1586,3 +1641,127 @@ def query_mcp_usage( "total_mcp_calls": total, "servers": result_servers, } + + +def query_agent_activity( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, +) -> dict: + """Query activity breakdown by Task subagent. + + RFC #41: Tracks agent activity from Task tool invocations, + distinguishing work done by agents vs main session. + + Args: + storage: Storage instance + days: Number of days to analyze + project: Optional project path filter + + Returns: + Dict with agent activity breakdown including: + - Main session stats (agent_id IS NULL) + - Per-agent stats (agent_id IS NOT NULL) + - Token usage, event counts, tool usage per agent + """ + cutoff = get_cutoff(days=days) + where_clause, params = build_where_clause( + cutoff=cutoff, + project=project, + ) + + # Query aggregated stats per agent_id (NULL = main session) + rows = storage.execute_query( + f""" + SELECT + agent_id, + COUNT(*) as event_count, + SUM(CASE WHEN entry_type = 'tool_use' THEN 1 ELSE 0 END) as tool_use_count, + SUM(COALESCE(input_tokens, 0)) as input_tokens, + SUM(COALESCE(output_tokens, 0)) as output_tokens, + SUM(COALESCE(cache_read_tokens, 0)) as cache_read_tokens, + SUM(CASE WHEN is_sidechain = 1 THEN 1 ELSE 0 END) as sidechain_events, + MIN(timestamp) as first_seen, + MAX(timestamp) as last_seen + FROM events + WHERE {where_clause} + GROUP BY agent_id + ORDER BY input_tokens DESC + """, + params, + ) + + agents = [] + main_session_stats = None + + for row in rows: + agent_data = { + "agent_id": row["agent_id"], + "event_count": row["event_count"], + "tool_use_count": row["tool_use_count"], + "input_tokens": row["input_tokens"], + "output_tokens": row["output_tokens"], + "cache_read_tokens": row["cache_read_tokens"], + "sidechain_events": row["sidechain_events"], + "first_seen": _format_timestamp(row["first_seen"]), + "last_seen": _format_timestamp(row["last_seen"]), + } + + if row["agent_id"] is None: + main_session_stats = agent_data + else: + agents.append(agent_data) + + # Get top tools per agent (for agents with activity) + agent_ids = [a["agent_id"] for a in agents] + if agent_ids: + placeholders = ",".join(["?"] * len(agent_ids)) + tool_rows = storage.execute_query( + f""" + SELECT + agent_id, + tool_name, + COUNT(*) as count + FROM events + WHERE {where_clause} + AND agent_id IN ({placeholders}) + AND tool_name IS NOT NULL + GROUP BY agent_id, tool_name + ORDER BY agent_id, count DESC + """, + params + agent_ids, + ) + + # Group top 5 tools per agent + agent_tools: dict[str, list] = {} + for row in tool_rows: + aid = row["agent_id"] + if aid not in agent_tools: + agent_tools[aid] = [] + if len(agent_tools[aid]) < 5: + agent_tools[aid].append({"tool": row["tool_name"], "count": row["count"]}) + + # Attach tools to agents + for agent in agents: + agent["top_tools"] = agent_tools.get(agent["agent_id"], []) + + # Calculate totals + total_agent_tokens = sum(a["input_tokens"] for a in agents) + total_main_tokens = main_session_stats["input_tokens"] if main_session_stats else 0 + + return { + "days": days, + "main_session": main_session_stats, + "agents": agents, + "summary": { + "agent_count": len(agents), + "total_agent_events": sum(a["event_count"] for a in agents), + "total_agent_tokens": total_agent_tokens, + "total_main_tokens": total_main_tokens, + "agent_token_percentage": ( + round(total_agent_tokens / (total_agent_tokens + total_main_tokens) * 100, 1) + if (total_agent_tokens + total_main_tokens) > 0 + else 0 + ), + }, + } diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index 35b4d0c..cb5ebf1 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -695,6 +695,28 @@ def get_mcp_usage(days: int = 7, project: str | None = None) -> dict: return {"status": "ok", **result} +@mcp.tool() +def get_agent_activity(days: int = 7, project: str | None = None) -> dict: + """Get activity breakdown by Task subagent. + + RFC #41: Tracks agent activity from Task tool invocations, + distinguishing work done by agents vs main session. + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + + Returns: + Dict with agent activity breakdown including: + - Main session stats (agent_id IS NULL) + - Per-agent stats with token usage and top tools + - Summary with agent vs main session token percentage + """ + queries.ensure_fresh_data(storage, days=days, project=project) + result = queries.query_agent_activity(storage, days=days, project=project) + return {"status": "ok", **result} + + def create_app(): """Create the ASGI app for uvicorn.""" # stateless_http=True allows resilience to server restarts diff --git a/src/session_analytics/storage.py b/src/session_analytics/storage.py index 90c86c1..247dc67 100644 --- a/src/session_analytics/storage.py +++ b/src/session_analytics/storage.py @@ -70,6 +70,12 @@ class Event: # we implement heuristic detection (e.g., stderr patterns, "Exit code: N" in output). exit_code: int | None = None # For failure detection (Bash commands) + # RFC #41: Agent tracking and token deduplication + parent_uuid: str | None = None # Links tool_use events to their assistant event + agent_id: str | None = None # Agent ID from agent-*.jsonl files (Task subagents) + is_sidechain: bool = False # True for agent/background work + version: str | None = None # Claude Code version from entry + @dataclass class Session: @@ -142,7 +148,7 @@ def __post_init__(self): DEFAULT_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" # Schema version for migrations -SCHEMA_VERSION = 4 +SCHEMA_VERSION = 5 # Migration functions: dict of version -> (migration_name, migration_func) # Each migration upgrades FROM version-1 TO version @@ -291,6 +297,44 @@ def migrate_v4(conn): ) +@migration(5, "add_agent_tracking") +def migrate_v5(conn): + """Add columns for RFC #41: Agent tracking and token deduplication. + + Adds: + - parent_uuid: Links tool_use events to their parent assistant event + - agent_id: Agent ID from agent-*.jsonl files (Task subagents) + - is_sidechain: Boolean for agent/background work + - version: Claude Code version from entry + + This migration supports the new event hierarchy where: + - assistant events have tokens (not duplicated) + - tool_use events link to parent via parent_uuid (no tokens) + """ + # Check existing columns + existing_cols = {row[1] for row in conn.execute("PRAGMA table_info(events)")} + + # Add parent_uuid for event hierarchy + if "parent_uuid" not in existing_cols: + conn.execute("ALTER TABLE events ADD COLUMN parent_uuid TEXT") + + # Add agent_id for Task subagent tracking + if "agent_id" not in existing_cols: + conn.execute("ALTER TABLE events ADD COLUMN agent_id TEXT") + + # Add is_sidechain for agent/background work + if "is_sidechain" not in existing_cols: + conn.execute("ALTER TABLE events ADD COLUMN is_sidechain INTEGER DEFAULT 0") + + # Add version for Claude Code version tracking + if "version" not in existing_cols: + conn.execute("ALTER TABLE events ADD COLUMN version TEXT") + + # Add indexes for efficient querying + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_parent_uuid ON events(parent_uuid)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_agent_id ON events(agent_id)") + + class SQLiteStorage: """SQLite-backed storage for session analytics.""" @@ -433,11 +477,17 @@ def _init_db(self): user_message_text TEXT, exit_code INTEGER, + -- RFC #41: Agent tracking and token deduplication + parent_uuid TEXT, + agent_id TEXT, + is_sidechain INTEGER DEFAULT 0, + version TEXT, + UNIQUE(session_id, uuid) ) """) - # Indexes for common queries + # Indexes for common queries (columns that exist in initial schema) conn.execute("CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp)") conn.execute("CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_events_tool ON events(tool_name)") @@ -571,10 +621,22 @@ def _init_db(self): ON events(id) WHERE user_message_text IS NOT NULL """) - # Run any pending migrations + # Run migrations AFTER all tables are created + # Only existing databases need migrations - fresh databases have full schema current_version = self._get_schema_version(conn) - if current_version < SCHEMA_VERSION: + if current_version > 0 and current_version < SCHEMA_VERSION: self._run_migrations(conn, current_version) + elif current_version == 0: + # Fresh database - just set the version, no migrations needed + conn.execute( + "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", + (SCHEMA_VERSION,), + ) + + # RFC #41: Create indexes for agent tracking columns + # These run AFTER migrations so columns exist on both fresh and migrated DBs + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_parent_uuid ON events(parent_uuid)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_agent_id ON events(agent_id)") # Event operations @@ -588,8 +650,9 @@ def add_event(self, event: Event) -> Event: tool_name, tool_input_json, tool_id, is_error, command, command_args, file_path, skill_name, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, - git_branch, cwd, user_message_text, exit_code - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + git_branch, cwd, user_message_text, exit_code, + parent_uuid, agent_id, is_sidechain, version + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( event.uuid, @@ -614,6 +677,10 @@ def add_event(self, event: Event) -> Event: event.cwd, event.user_message_text, event.exit_code, + event.parent_uuid, + event.agent_id, + 1 if event.is_sidechain else 0, + event.version, ), ) event.id = cursor.lastrowid @@ -629,8 +696,9 @@ def add_events_batch(self, events: list[Event]) -> int: tool_name, tool_input_json, tool_id, is_error, command, command_args, file_path, skill_name, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, - git_branch, cwd, user_message_text, exit_code - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + git_branch, cwd, user_message_text, exit_code, + parent_uuid, agent_id, is_sidechain, version + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( @@ -656,6 +724,10 @@ def add_events_batch(self, events: list[Event]) -> int: e.cwd, e.user_message_text, e.exit_code, + e.parent_uuid, + e.agent_id, + 1 if e.is_sidechain else 0, + e.version, ) for e in events ], @@ -748,6 +820,11 @@ def get_col(name: str, default=None): cwd=row["cwd"], user_message_text=get_col("user_message_text"), exit_code=get_col("exit_code"), + # RFC #41: Agent tracking + parent_uuid=get_col("parent_uuid"), + agent_id=get_col("agent_id"), + is_sidechain=bool(get_col("is_sidechain", 0)), + version=get_col("version"), ) # Session operations diff --git a/tests/test_ingest.py b/tests/test_ingest.py index e34513e..6d85364 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -7,6 +7,7 @@ import pytest from session_analytics.ingest import ( + extract_command_name, find_log_files, ingest_file, parse_entry, @@ -136,6 +137,41 @@ def test_parse_mcp_tool(self): assert result["tool_name"] == "mcp__event-bus__register_session" +class TestExtractCommandName: + """Tests for extracting command names from isMeta entries.""" + + def test_extract_from_markdown_heading(self): + """Test extracting command from markdown heading.""" + content = "# Status Report\n\nGenerate repo status summary..." + assert extract_command_name(content) == "status-report" + + def test_extract_multi_word_command(self): + """Test extracting multi-word commands with special chars normalized.""" + content = "# I'm Lost\n\nShow current workflow position..." + # Apostrophes and other non-alphanumeric chars are normalized to hyphens + assert extract_command_name(content) == "i-m-lost" + + def test_extract_from_list_content(self): + """Test extracting from list content with text blocks.""" + content = [{"type": "text", "text": "# PR Review\n\nReview code..."}] + assert extract_command_name(content) == "pr-review" + + def test_no_heading_returns_none(self): + """Test that content without heading returns None.""" + content = "Just a regular message" + assert extract_command_name(content) is None + + def test_non_command_heading_filtered(self): + """Test that common non-command headings are filtered.""" + content = "# Context\n\nSome context..." + assert extract_command_name(content) is None + + def test_empty_content_returns_none(self): + """Test that empty content returns None.""" + assert extract_command_name("") is None + assert extract_command_name([]) is None + + class TestParseEntry: """Tests for entry parsing.""" @@ -156,7 +192,11 @@ def test_parse_user_message(self): assert events[0].session_id == "session-1" def test_parse_assistant_with_tool(self): - """Test parsing an assistant message with tool_use.""" + """Test parsing an assistant message with tool_use. + + RFC #41: Now creates both an assistant event (with tokens) and tool_use events + (without tokens) to fix token duplication. + """ entry = { "type": "assistant", "uuid": "assistant-1", @@ -176,11 +216,24 @@ def test_parse_assistant_with_tool(self): }, } events = parse_entry(entry, "test-project") - assert len(events) == 1 - assert events[0].entry_type == "tool_use" - assert events[0].tool_name == "Bash" - assert events[0].command == "ls" + + # RFC #41: Should create 2 events: 1 assistant + 1 tool_use + assert len(events) == 2 + + # First event is assistant with tokens + assert events[0].entry_type == "assistant" + assert events[0].uuid == "assistant-1" assert events[0].input_tokens == 100 + assert events[0].output_tokens == 50 + assert events[0].parent_uuid is None # Assistant has no parent + + # Second event is tool_use WITHOUT tokens, linked to parent + assert events[1].entry_type == "tool_use" + assert events[1].tool_name == "Bash" + assert events[1].command == "ls" + assert events[1].input_tokens is None # No tokens on tool_use + assert events[1].output_tokens is None + assert events[1].parent_uuid == "assistant-1" # Links to parent def test_parse_tool_result(self): """Test parsing a tool_result entry.""" @@ -222,6 +275,42 @@ def test_skip_malformed_entry(self): events = parse_entry(entry, "test-project") assert len(events) == 0 + def test_parse_ismeta_command(self): + """Test parsing an isMeta user message (slash command expansion).""" + entry = { + "type": "user", + "uuid": "cmd-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "isMeta": True, + "message": { + "role": "user", + "content": [{"type": "text", "text": "# Status Report\n\nGenerate status..."}], + }, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "command" + assert events[0].skill_name == "status-report" + + def test_parse_ismeta_without_command_heading(self): + """Test that isMeta without valid command heading stays as user.""" + entry = { + "type": "user", + "uuid": "msg-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "isMeta": True, + "message": { + "role": "user", + "content": "Just some meta text without a heading", + }, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "user" + assert events[0].skill_name is None + def test_user_message_text_truncation_at_boundary(self): """Test that user_message_text is truncated at USER_MESSAGE_MAX_LENGTH (2000 chars).""" from session_analytics.ingest import USER_MESSAGE_MAX_LENGTH @@ -281,13 +370,17 @@ class TestIngestFile: """Tests for file ingestion.""" def test_ingest_file(self, storage, sample_logs_dir): - """Test ingesting a JSONL file.""" + """Test ingesting a JSONL file. + + RFC #41: Assistant with tool_use now creates 2 events (assistant + tool_use), + so 3 entries → 4 events (1 user + 2 from assistant + 1 tool_result). + """ project_dir = sample_logs_dir / "-test-project" jsonl_file = project_dir / "test-session.jsonl" result = ingest_file(jsonl_file, storage) assert result["entries_processed"] == 3 - assert result["events_added"] == 3 + assert result["events_added"] == 4 # RFC #41: assistant creates 2 events now assert result["skipped"] is False def test_incremental_ingestion(self, storage, sample_logs_dir): @@ -346,7 +439,10 @@ class TestIngestLogs: """Tests for full ingestion flow.""" def test_ingest_logs(self, storage, sample_logs_dir): - """Test full ingestion flow.""" + """Test full ingestion flow. + + RFC #41: Assistant with tool_use creates 2 events, so 3 entries → 4 events. + """ # Use find_log_files with explicit logs_dir from session_analytics.ingest import ingest_file as do_ingest_file from session_analytics.ingest import update_session_stats @@ -356,7 +452,7 @@ def test_ingest_logs(self, storage, sample_logs_dir): # Ingest the file result = do_ingest_file(files[0], storage) - assert result["events_added"] == 3 + assert result["events_added"] == 4 # RFC #41: assistant creates 2 events # Update session stats sessions = update_session_stats(storage) @@ -886,3 +982,274 @@ def test_batch_correlation_error_logged(self, storage, caplog): assert result["correlation_errors"] == 1 assert result["commits_correlated"] == 0 assert "Failed to batch correlate" in caplog.text + + +class TestRFC41AgentTracking: + """Tests for RFC #41: Agent tracking and token deduplication. + + These tests verify: + - Assistant messages with tools create both assistant + tool_use events + - Tokens are only on assistant events (not duplicated to tool_use) + - Agent tracking fields (agentId, isSidechain, version) are captured + - parent_uuid links tool_use events to their parent assistant + """ + + def test_parse_assistant_creates_both_events(self): + """Assistant with tools creates assistant + tool_use events. + + RFC #41: Previously only tool_use events were created, leading to + token duplication when multiple tools were in one message. + """ + entry = { + "type": "assistant", + "uuid": "multi-tool-assist", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "model": "claude-opus-4-5", + "content": [ + { + "type": "tool_use", + "id": "tool-1", + "name": "Bash", + "input": {"command": "ls"}, + }, + { + "type": "tool_use", + "id": "tool-2", + "name": "Read", + "input": {"file_path": "/x.py"}, + }, + ], + "usage": {"input_tokens": 100, "output_tokens": 50}, + }, + } + events = parse_entry(entry, "test-project") + + # Should create 3 events: 1 assistant + 2 tool_use + assert len(events) == 3 + + # First event is assistant with tokens + assert events[0].entry_type == "assistant" + assert events[0].uuid == "multi-tool-assist" + assert events[0].input_tokens == 100 + assert events[0].output_tokens == 50 + assert events[0].parent_uuid is None + + # Tool events have NO tokens, linked via parent_uuid + assert events[1].entry_type == "tool_use" + assert events[1].tool_name == "Bash" + assert events[1].input_tokens is None + assert events[1].output_tokens is None + assert events[1].parent_uuid == "multi-tool-assist" + + assert events[2].entry_type == "tool_use" + assert events[2].tool_name == "Read" + assert events[2].input_tokens is None + assert events[2].parent_uuid == "multi-tool-assist" + + def test_parse_assistant_without_tools(self): + """Assistant without tools creates single assistant event with tokens.""" + entry = { + "type": "assistant", + "uuid": "text-only-assist", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "model": "claude-opus-4-5", + "content": [{"type": "text", "text": "Hello, how can I help?"}], + "usage": {"input_tokens": 100, "output_tokens": 50}, + }, + } + events = parse_entry(entry, "test-project") + + # Should create only 1 event (assistant) + assert len(events) == 1 + assert events[0].entry_type == "assistant" + assert events[0].input_tokens == 100 + assert events[0].output_tokens == 50 + assert events[0].parent_uuid is None + + def test_parse_agent_entry(self): + """Test agentId extraction from agent file entries.""" + entry = { + "type": "assistant", + "uuid": "agent-assist-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "agentId": "a07519c", + "isSidechain": True, + "version": "2.0.76", + "message": { + "model": "claude-opus-4-5", + "content": [{"type": "text", "text": "Agent response"}], + "usage": {"input_tokens": 50, "output_tokens": 25}, + }, + } + events = parse_entry(entry, "test-project") + + assert len(events) == 1 + assert events[0].agent_id == "a07519c" + assert events[0].is_sidechain is True + assert events[0].version == "2.0.76" + + def test_parse_main_session_no_agent(self): + """Main session entries have no agentId.""" + entry = { + "type": "assistant", + "uuid": "main-assist-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "isSidechain": False, + "version": "2.0.76", + "message": { + "model": "claude-opus-4-5", + "content": [{"type": "text", "text": "Main response"}], + "usage": {"input_tokens": 50, "output_tokens": 25}, + }, + } + events = parse_entry(entry, "test-project") + + assert events[0].agent_id is None + assert events[0].is_sidechain is False + assert events[0].version == "2.0.76" + + def test_agent_fields_propagate_to_tool_uses(self): + """Agent tracking fields propagate from assistant to tool_use events.""" + entry = { + "type": "assistant", + "uuid": "agent-with-tools", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "agentId": "b123456", + "isSidechain": True, + "version": "2.0.80", + "message": { + "model": "claude-opus-4-5", + "content": [ + { + "type": "tool_use", + "id": "tool-1", + "name": "Edit", + "input": {"file_path": "/x.py"}, + }, + ], + "usage": {"input_tokens": 200, "output_tokens": 100}, + }, + } + events = parse_entry(entry, "test-project") + + assert len(events) == 2 + + # Assistant event has agent fields + assert events[0].agent_id == "b123456" + assert events[0].is_sidechain is True + assert events[0].version == "2.0.80" + + # Tool event inherits agent fields + assert events[1].agent_id == "b123456" + assert events[1].is_sidechain is True + assert events[1].version == "2.0.80" + assert events[1].parent_uuid == "agent-with-tools" + + def test_token_deduplication_on_ingest(self, storage, tmp_path): + """Verify tokens are not duplicated when ingesting multi-tool messages. + + RFC #41: Before this fix, a message with 3 tool_uses would count + tokens 3x (once per tool). Now tokens are only on the assistant event. + """ + import json + + from session_analytics.ingest import ingest_file + + # Create JSONL with assistant having 3 tool_uses + jsonl_content = json.dumps( + { + "type": "assistant", + "uuid": "dedup-1", + "sessionId": "dedup-session", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "model": "claude-opus-4-5", + "content": [ + { + "type": "tool_use", + "id": "t1", + "name": "Bash", + "input": {"command": "ls"}, + }, + { + "type": "tool_use", + "id": "t2", + "name": "Read", + "input": {"file_path": "/x"}, + }, + { + "type": "tool_use", + "id": "t3", + "name": "Edit", + "input": {"file_path": "/y"}, + }, + ], + "usage": {"input_tokens": 900, "output_tokens": 300}, + }, + } + ) + + project_dir = tmp_path / "-test-project" + project_dir.mkdir() + (project_dir / "test.jsonl").write_text(jsonl_content) + + ingest_file(project_dir / "test.jsonl", storage) + + # Query total tokens - should be 900, not 2700 (3x duplication) + rows = storage.execute_query("SELECT SUM(input_tokens) as total FROM events") + assert rows[0]["total"] == 900 + + # Query output tokens too + rows = storage.execute_query("SELECT SUM(output_tokens) as total FROM events") + assert rows[0]["total"] == 300 + + def test_user_entry_gets_agent_fields(self): + """User entries also capture agent tracking fields.""" + entry = { + "type": "user", + "uuid": "user-in-agent", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "agentId": "c789012", + "isSidechain": True, + "version": "2.0.76", + "message": {"role": "user", "content": "User message in agent context"}, + } + events = parse_entry(entry, "test-project") + + assert len(events) == 1 + assert events[0].agent_id == "c789012" + assert events[0].is_sidechain is True + assert events[0].version == "2.0.76" + + def test_tool_result_gets_agent_fields(self): + """Tool result entries capture agent tracking fields.""" + entry = { + "type": "user", + "uuid": "result-in-agent", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "agentId": "d345678", + "isSidechain": True, + "version": "2.0.76", + "message": { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "tool-1", "content": "result"}, + ], + }, + } + events = parse_entry(entry, "test-project") + + assert len(events) == 1 + assert events[0].entry_type == "tool_result" + assert events[0].agent_id == "d345678" + assert events[0].is_sidechain is True + assert events[0].version == "2.0.76" diff --git a/tests/test_queries.py b/tests/test_queries.py index f0ed296..10ae3f0 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -5,6 +5,7 @@ from session_analytics.queries import ( ensure_fresh_data, get_cutoff, + query_agent_activity, query_commands, query_file_activity, query_languages, @@ -1522,3 +1523,278 @@ def test_comparison_after_normalization(self): normalized_naive = normalize_datetime(naive_dt) assert normalized_aware == normalized_naive + + +class TestQueryAgentActivity: + """Tests for query_agent_activity(). + + RFC #41: Tracks agent activity from Task tool invocations, + distinguishing work done by agents vs main session. + """ + + def test_main_session_only(self, storage): + """Test with only main session events (no agent_id).""" + now = datetime.now() + storage.add_event( + Event( + id=None, + uuid="main-1", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="assistant", + input_tokens=100, + output_tokens=50, + agent_id=None, # Main session + ) + ) + storage.add_event( + Event( + id=None, + uuid="main-2", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="tool_use", + tool_name="Read", + input_tokens=None, # tool_use has no tokens + agent_id=None, + ) + ) + + result = query_agent_activity(storage, days=1) + + assert result["days"] == 1 + assert result["main_session"] is not None + assert result["main_session"]["event_count"] == 2 + assert result["main_session"]["input_tokens"] == 100 + assert result["agents"] == [] + assert result["summary"]["agent_count"] == 0 + assert result["summary"]["agent_token_percentage"] == 0 + + def test_agent_and_main_session(self, storage): + """Test with both main session and agent events.""" + now = datetime.now() + + # Main session events + storage.add_event( + Event( + id=None, + uuid="main-1", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="assistant", + input_tokens=200, + output_tokens=100, + agent_id=None, + ) + ) + + # Agent events + storage.add_event( + Event( + id=None, + uuid="agent-1", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="assistant", + input_tokens=300, + output_tokens=150, + agent_id="a123456", + is_sidechain=True, + ) + ) + storage.add_event( + Event( + id=None, + uuid="agent-2", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="tool_use", + tool_name="Bash", + agent_id="a123456", + is_sidechain=True, + ) + ) + + result = query_agent_activity(storage, days=1) + + # Check main session + assert result["main_session"]["event_count"] == 1 + assert result["main_session"]["input_tokens"] == 200 + + # Check agent + assert len(result["agents"]) == 1 + agent = result["agents"][0] + assert agent["agent_id"] == "a123456" + assert agent["event_count"] == 2 + assert agent["tool_use_count"] == 1 + assert agent["input_tokens"] == 300 + assert agent["sidechain_events"] == 2 # Both agent events have is_sidechain=True + + # Check summary + assert result["summary"]["agent_count"] == 1 + assert result["summary"]["total_agent_tokens"] == 300 + assert result["summary"]["total_main_tokens"] == 200 + # 300 / (300 + 200) = 60% + assert result["summary"]["agent_token_percentage"] == 60.0 + + def test_multiple_agents(self, storage): + """Test with multiple agents.""" + now = datetime.now() + + # Agent A + storage.add_event( + Event( + id=None, + uuid="agent-a-1", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="assistant", + input_tokens=400, + agent_id="agent-a", + ) + ) + + # Agent B + storage.add_event( + Event( + id=None, + uuid="agent-b-1", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="assistant", + input_tokens=100, + agent_id="agent-b", + ) + ) + + result = query_agent_activity(storage, days=1) + + # Agents should be ordered by input_tokens DESC + assert len(result["agents"]) == 2 + assert result["agents"][0]["agent_id"] == "agent-a" + assert result["agents"][0]["input_tokens"] == 400 + assert result["agents"][1]["agent_id"] == "agent-b" + assert result["agents"][1]["input_tokens"] == 100 + + assert result["summary"]["agent_count"] == 2 + assert result["summary"]["total_agent_tokens"] == 500 + + def test_top_tools_per_agent(self, storage): + """Test that top tools are calculated per agent.""" + now = datetime.now() + + # Agent with multiple tool uses + storage.add_event( + Event( + id=None, + uuid="agent-assist", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="assistant", + input_tokens=100, + agent_id="agent-1", + ) + ) + for i, tool in enumerate(["Read", "Read", "Read", "Edit", "Bash"]): + storage.add_event( + Event( + id=None, + uuid=f"agent-tool-{i}", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="tool_use", + tool_name=tool, + agent_id="agent-1", + ) + ) + + result = query_agent_activity(storage, days=1) + + assert len(result["agents"]) == 1 + agent = result["agents"][0] + assert "top_tools" in agent + assert len(agent["top_tools"]) == 3 # Read, Edit, Bash + + # Read should be first (count=3) + assert agent["top_tools"][0]["tool"] == "Read" + assert agent["top_tools"][0]["count"] == 3 + + def test_project_filter(self, storage): + """Test project filter works.""" + now = datetime.now() + + # Project A events + storage.add_event( + Event( + id=None, + uuid="project-a", + timestamp=now, + session_id="s1", + project_path="project-a", + entry_type="assistant", + input_tokens=100, + agent_id="agent-1", + ) + ) + + # Project B events + storage.add_event( + Event( + id=None, + uuid="project-b", + timestamp=now, + session_id="s2", + project_path="project-b", + entry_type="assistant", + input_tokens=200, + agent_id="agent-2", + ) + ) + + result = query_agent_activity(storage, days=1, project="project-a") + + # Should only see agent-1 from project-a + assert len(result["agents"]) == 1 + assert result["agents"][0]["agent_id"] == "agent-1" + + def test_empty_results(self, storage): + """Test with no matching events.""" + result = query_agent_activity(storage, days=1) + + assert result["main_session"] is None + assert result["agents"] == [] + assert result["summary"]["agent_count"] == 0 + assert result["summary"]["agent_token_percentage"] == 0 + + def test_zero_division_protection(self, storage): + """Test that percentage calculation handles zero tokens.""" + now = datetime.now() + + # Event with zero tokens + storage.add_event( + Event( + id=None, + uuid="zero-tokens", + timestamp=now, + session_id="s1", + project_path="test-project", + entry_type="tool_use", + tool_name="Read", + input_tokens=None, # No tokens + agent_id=None, + ) + ) + + result = query_agent_activity(storage, days=1) + + # Should not raise ZeroDivisionError + assert result["summary"]["agent_token_percentage"] == 0 diff --git a/tests/test_storage.py b/tests/test_storage.py index cdf8b64..f2cc54c 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -834,3 +834,151 @@ def test_session_context_switch_default(self, storage): ("session-default",), ) assert rows[0]["context_switch_count"] == 0 + + +class TestAgentTrackingFields: + """Tests for RFC #41: Agent tracking and token deduplication. + + These tests verify the new fields for tracking Task subagent activity: + - parent_uuid: Links tool_use events to their assistant event + - agent_id: Agent ID from agent-*.jsonl files + - is_sidechain: Boolean for agent/background work + - version: Claude Code version + """ + + def test_event_with_agent_fields(self, storage): + """Test storing and retrieving event with agent tracking fields.""" + event = Event( + id=None, + uuid="agent-event-1", + timestamp=datetime.now(), + session_id="session-1", + entry_type="assistant", + parent_uuid=None, + agent_id="a123456", + is_sidechain=True, + version="2.0.76", + input_tokens=100, + ) + storage.add_event(event) + + events = storage.get_events_in_range(session_id="session-1") + assert len(events) == 1 + assert events[0].agent_id == "a123456" + assert events[0].is_sidechain is True + assert events[0].version == "2.0.76" + + def test_tool_use_with_parent_uuid(self, storage): + """Test storing tool_use event linked to parent via parent_uuid.""" + # First add assistant event + assistant = Event( + id=None, + uuid="parent-assist-1", + timestamp=datetime.now(), + session_id="session-1", + entry_type="assistant", + input_tokens=100, + ) + storage.add_event(assistant) + + # Add tool_use event referencing parent + tool = Event( + id=None, + uuid="child-tool-1", + timestamp=datetime.now(), + session_id="session-1", + entry_type="tool_use", + parent_uuid="parent-assist-1", + tool_name="Bash", + input_tokens=None, # No tokens on tool_use (deduplication) + ) + storage.add_event(tool) + + events = storage.get_events_in_range(session_id="session-1") + tool_event = [e for e in events if e.entry_type == "tool_use"][0] + assert tool_event.parent_uuid == "parent-assist-1" + assert tool_event.input_tokens is None + + def test_event_with_null_agent_fields(self, storage): + """Test event with NULL agent fields (main session events).""" + event = Event( + id=None, + uuid="main-event-1", + timestamp=datetime.now(), + session_id="session-1", + entry_type="assistant", + # agent_id and parent_uuid are None by default + is_sidechain=False, # Main session + ) + storage.add_event(event) + + events = storage.get_events_in_range(session_id="session-1") + assert len(events) == 1 + assert events[0].agent_id is None + assert events[0].parent_uuid is None + assert events[0].is_sidechain is False + + def test_is_sidechain_default(self, storage): + """Test that is_sidechain defaults to False.""" + event = Event( + id=None, + uuid="default-sidechain", + timestamp=datetime.now(), + session_id="session-1", + entry_type="assistant", + ) + storage.add_event(event) + + events = storage.get_events_in_range(session_id="session-1") + assert events[0].is_sidechain is False + + def test_batch_add_with_agent_fields(self, storage): + """Test batch adding events with agent tracking fields.""" + events = [ + Event( + id=None, + uuid="batch-agent-1", + timestamp=datetime.now(), + session_id="session-1", + entry_type="assistant", + agent_id="a123", + is_sidechain=True, + version="2.0.76", + ), + Event( + id=None, + uuid="batch-tool-1", + timestamp=datetime.now(), + session_id="session-1", + entry_type="tool_use", + parent_uuid="batch-agent-1", + agent_id="a123", + is_sidechain=True, + tool_name="Read", + ), + ] + count = storage.add_events_batch(events) + assert count == 2 + + stored = storage.get_events_in_range(session_id="session-1") + assert len(stored) == 2 + + # Verify assistant event + assistant = [e for e in stored if e.entry_type == "assistant"][0] + assert assistant.agent_id == "a123" + + # Verify tool_use event links to parent + tool = [e for e in stored if e.entry_type == "tool_use"][0] + assert tool.parent_uuid == "batch-agent-1" + + def test_index_on_parent_uuid(self, storage): + """Verify that idx_events_parent_uuid index exists.""" + rows = storage.execute_query("PRAGMA index_list(events)") + indexes = {row[1] for row in rows} + assert "idx_events_parent_uuid" in indexes + + def test_index_on_agent_id(self, storage): + """Verify that idx_events_agent_id index exists.""" + rows = storage.execute_query("PRAGMA index_list(events)") + indexes = {row[1] for row in rows} + assert "idx_events_agent_id" in indexes