Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Do this:
{"error_count": 5, "error_rate": 0.25, "has_rework": True, "commit_count": 0}
```

## MCP Tools (27 total)
## MCP Tools (28 total)

### Status & Ingestion
| Tool | Purpose |
Expand Down Expand Up @@ -187,6 +187,11 @@ Do this:
| `get_projects` | Activity across all projects |
| `get_mcp_usage` | MCP server and tool usage breakdown |

### Agent Activity
| Tool | Purpose |
|------|---------|
| `get_agent_activity` | Task subagent activity vs main session (RFC #41) |

### Session Analysis
| Tool | Purpose |
|------|---------|
Expand Down Expand Up @@ -227,7 +232,7 @@ Do this:
> **Maintainer note**: This discovery flow is also documented in `src/session_analytics/guide.md`
> (exposed as MCP resource `session-analytics://guide`). Keep both in sync when updating API docs.

## CLI Commands (26 total)
## CLI Commands (27 total)

All commands support `--json` for machine-readable output:

Expand All @@ -254,6 +259,9 @@ session-analytics-cli languages # Language distribution
session-analytics-cli projects # Cross-project activity
session-analytics-cli mcp-usage # MCP server/tool usage

# Agent Activity
session-analytics-cli agents # Task subagent vs main session (RFC #41)

# Session Analysis
session-analytics-cli signals # Raw session metrics
session-analytics-cli classify # Categorize sessions
Expand Down
54 changes: 54 additions & 0 deletions src/session_analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
find_related_sessions,
get_handoff_context,
get_user_journey,
query_agent_activity,
query_commands,
query_file_activity,
query_languages,
Expand Down Expand Up @@ -213,6 +214,43 @@ def _format_mcp_usage(data: dict) -> list[str]:
return lines


@_register_formatter(lambda d: "agents" in d and "main_session" in d)
def _format_agent_activity(data: dict) -> list[str]:
"""Format agent activity breakdown.

RFC #41: Shows activity by Task subagent vs main session.
"""
summary = data.get("summary", {})
lines = [
"Agent activity breakdown (Task subagent vs main session)",
"",
f"Agents: {summary.get('agent_count', 0)}",
f"Agent tokens: {summary.get('total_agent_tokens', 0):,} ({summary.get('agent_token_percentage', 0)}%)",
f"Main tokens: {summary.get('total_main_tokens', 0):,}",
"",
]

# Main session stats
main = data.get("main_session")
if main:
lines.append("Main Session:")
lines.append(f" Events: {main['event_count']:,}")
lines.append(f" Tokens: {main['input_tokens']:,} in / {main['output_tokens']:,} out")
lines.append("")

# Per-agent stats
for agent in data.get("agents", []):
lines.append(f"Agent {agent['agent_id']}:")
lines.append(f" Events: {agent['event_count']:,} ({agent['tool_use_count']:,} tool uses)")
lines.append(f" Tokens: {agent['input_tokens']:,} in / {agent['output_tokens']:,} out")
if agent.get("top_tools"):
tools_str = ", ".join(f"{t['tool']}:{t['count']}" for t in agent["top_tools"][:3])
lines.append(f" Top tools: {tools_str}")
lines.append("")

return lines


@_register_formatter(lambda d: "samples" in d and "parsed_tools" in d)
def _format_sample_sequences(data: dict) -> list[str]:
lines = [
Expand Down Expand Up @@ -630,6 +668,16 @@ def cmd_mcp_usage(args):
print(format_output(result, args.json))


def cmd_agents(args):
"""Show agent activity breakdown.

RFC #41: Shows activity by Task subagent vs main session.
"""
storage = SQLiteStorage()
result = query_agent_activity(storage, days=args.days, project=args.project)
print(format_output(result, args.json))


def cmd_insights(args):
"""Show insights for /improve-workflow."""
storage = SQLiteStorage()
Expand Down Expand Up @@ -1081,6 +1129,12 @@ def main():
sub.add_argument("--project", help="Project path filter")
sub.set_defaults(func=cmd_mcp_usage)

# agents (RFC #41)
sub = subparsers.add_parser("agents", help="Show Task subagent activity breakdown")
sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)")
sub.add_argument("--project", help="Project path filter")
sub.set_defaults(func=cmd_agents)

args = parser.parse_args()
args.func(args)

Expand Down
6 changes: 6 additions & 0 deletions src/session_analytics/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ identify permission gaps.
|------|---------|
| `get_session_signals(days?, min_count?)` | Raw session metrics for LLM interpretation |

### Agent Activity

| Tool | Purpose |
|------|---------|
| `get_agent_activity(days?, project?)` | Task subagent activity vs main session (RFC #41) |

## Quick Start

### 1. Check status
Expand Down
161 changes: 122 additions & 39 deletions src/session_analytics/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
import re
from datetime import datetime, timedelta
from pathlib import Path

Expand Down Expand Up @@ -60,6 +61,49 @@ def find_log_files(
return [f for f, _ in files]


def extract_command_name(content: str | list) -> str | None:
"""Extract command name from isMeta user message content.

User-defined commands (e.g., /status-report) are expanded as user messages
with isMeta=true. The content starts with a markdown heading like "# Status Report".

Returns:
Normalized command name (e.g., "status-report") or None if not detected.
"""
# Get the text content
text = None
if isinstance(content, str):
text = content
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
elif isinstance(item, str):
text = item
break

if not text:
return None

# Look for markdown heading at the start: "# Command Name"
match = re.match(r"^#\s+(.+?)(?:\n|$)", text.strip())
if not match:
return None

# Normalize: "Status Report" -> "status-report", "I'm Lost" -> "im-lost"
# Use regex to replace non-alphanumeric chars with hyphens, then clean up
command_name = re.sub(r"[^a-z0-9]+", "-", match.group(1).strip().lower())
command_name = command_name.strip("-") # Remove leading/trailing hyphens

# Filter out common non-command headings
non_commands = {"context", "instructions", "usage", "example", "examples", "notes"}
if command_name in non_commands:
return None

return command_name


def parse_tool_use(tool_use: dict) -> dict:
"""Extract normalized fields from a tool_use block.

Expand Down Expand Up @@ -155,59 +199,75 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]:
cache_creation_tokens = usage.get("cache_creation_input_tokens")
model = message.get("model")

# RFC #41: Extract agent tracking fields
agent_id = raw.get("agentId") # Present only in agent-*.jsonl files
is_sidechain = raw.get("isSidechain", False) # True for agent/background work
version = raw.get("version") # Claude Code version

events = []

# Handle assistant entries with tool_use blocks
# RFC #41: Always create assistant event with tokens, then tool_use events without tokens
if entry_type == "assistant":
content = message.get("content", [])
tool_uses = [c for c in content if isinstance(c, dict) and c.get("type") == "tool_use"]

if tool_uses:
# Create an event for each tool_use
for tool_use in tool_uses:
parsed = parse_tool_use(tool_use)
events.append(
Event(
id=None,
uuid=f"{uuid}:{parsed['tool_id']}", # Unique per tool_use
timestamp=timestamp,
session_id=session_id,
project_path=project_path,
entry_type="tool_use",
tool_name=parsed["tool_name"],
tool_input_json=parsed["tool_input_json"],
tool_id=parsed["tool_id"],
is_error=False,
command=parsed["command"],
command_args=parsed["command_args"],
file_path=parsed["file_path"],
skill_name=parsed["skill_name"],
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_creation_tokens=cache_creation_tokens,
model=model,
git_branch=git_branch,
cwd=cwd,
)
)
else:
# Assistant message without tools
# ALWAYS create assistant event with tokens (fixes token duplication)
events.append(
Event(
id=None,
uuid=uuid,
timestamp=timestamp,
session_id=session_id,
project_path=project_path,
entry_type="assistant",
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_creation_tokens=cache_creation_tokens,
model=model,
git_branch=git_branch,
cwd=cwd,
# RFC #41: Agent tracking fields
parent_uuid=None, # Assistant events have no parent
agent_id=agent_id,
is_sidechain=is_sidechain,
version=version,
)
)

# Create tool_use events WITHOUT tokens, linked via parent_uuid
for tool_use in tool_uses:
parsed = parse_tool_use(tool_use)
events.append(
Event(
id=None,
uuid=uuid,
uuid=f"{uuid}:{parsed['tool_id']}", # Unique per tool_use
timestamp=timestamp,
session_id=session_id,
project_path=project_path,
entry_type="assistant",
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_creation_tokens=cache_creation_tokens,
entry_type="tool_use",
tool_name=parsed["tool_name"],
tool_input_json=parsed["tool_input_json"],
tool_id=parsed["tool_id"],
is_error=False,
command=parsed["command"],
command_args=parsed["command_args"],
file_path=parsed["file_path"],
skill_name=parsed["skill_name"],
# RFC #41: NO tokens on tool_use - they're on the parent assistant
input_tokens=None,
output_tokens=None,
cache_read_tokens=None,
cache_creation_tokens=None,
model=model,
git_branch=git_branch,
cwd=cwd,
# RFC #41: Link to parent assistant event
parent_uuid=uuid,
agent_id=agent_id,
is_sidechain=is_sidechain,
version=version,
)
)

Expand All @@ -230,6 +290,11 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]:
if text_parts:
user_message_text = " ".join(text_parts)[:USER_MESSAGE_MAX_LENGTH]

# Extract command name from isMeta user messages (slash command expansions)
# e.g., /status-report expands to a user message starting with "# Status Report"
is_meta = raw.get("isMeta", False)
command_name = extract_command_name(content) if is_meta else None

# Check if content is a list with tool_result blocks
if isinstance(content, list):
tool_results = [
Expand All @@ -251,6 +316,10 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]:
is_error=is_error,
git_branch=git_branch,
cwd=cwd,
# RFC #41: Agent tracking fields
agent_id=agent_id,
is_sidechain=is_sidechain,
version=version,
)
)
else:
Expand All @@ -262,10 +331,15 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]:
timestamp=timestamp,
session_id=session_id,
project_path=project_path,
entry_type="user",
entry_type="command" if command_name else "user",
skill_name=command_name, # Reuse skill_name for command tracking
user_message_text=user_message_text,
git_branch=git_branch,
cwd=cwd,
# RFC #41: Agent tracking fields
agent_id=agent_id,
is_sidechain=is_sidechain,
version=version,
)
)
else:
Expand All @@ -277,10 +351,15 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]:
timestamp=timestamp,
session_id=session_id,
project_path=project_path,
entry_type="user",
entry_type="command" if command_name else "user",
skill_name=command_name, # Reuse skill_name for command tracking
user_message_text=user_message_text,
git_branch=git_branch,
cwd=cwd,
# RFC #41: Agent tracking fields
agent_id=agent_id,
is_sidechain=is_sidechain,
version=version,
)
)

Expand All @@ -294,6 +373,10 @@ def parse_entry(raw: dict, project_path: str) -> list[Event]:
session_id=session_id if session_id else "unknown",
project_path=project_path,
entry_type="summary",
# RFC #41: Agent tracking fields
agent_id=agent_id,
is_sidechain=is_sidechain,
version=version,
)
)

Expand Down
Loading