diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..65a0653 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,18 @@ +{ + "permissions": { + "allow": [ + "Bash(chmod:*)", + "Bash(python3 -m venv:*)", + "Bash(.venv/bin/pip install:*)", + "Bash(brew list:*)", + "Bash(/opt/homebrew/bin/python3.12:*)", + "Bash(.venv/bin/ruff format:*)", + "Bash(.venv/bin/ruff check .)", + "Bash(.venv/bin/pytest tests/ -v)", + "Bash(./scripts/install-launchagent.sh:*)", + "Bash(claude mcp add:*)", + "Bash(curl:*)", + "Bash(cat:*)" + ] + } +} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a191f7c --- /dev/null +++ b/Makefile @@ -0,0 +1,73 @@ +.PHONY: check fmt lint test clean install uninstall dev venv + +# Run all quality gates (format check, lint, tests) +check: fmt lint test + +# Check/fix formatting with ruff +fmt: + ruff format --check . + +# Run linter with ruff +lint: + ruff check . + +# Run tests +test: + pytest tests/ -v + +# Clean build artifacts +clean: + rm -rf build/ dist/ *.egg-info .pytest_cache .ruff_cache + find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true + +# Create virtual environment (requires Python 3.10+) +venv: + @if [ ! -d .venv ]; then \ + echo "Creating virtual environment..."; \ + PYTHON=$$(command -v python3.12 || command -v python3.11 || command -v python3.10 || echo "python3"); \ + $$PYTHON -m venv .venv && .venv/bin/pip install --upgrade pip; \ + fi + +# Install with dev dependencies (for development) +dev: venv + .venv/bin/pip install -e ".[dev]" + +# Full installation: venv + deps + LaunchAgent + CLI + MCP +install: venv + @echo "Installing dependencies..." + .venv/bin/pip install -e . + @echo "" + @echo "Installing LaunchAgent..." + ./scripts/install-launchagent.sh + @echo "" + @echo "Adding to Claude Code..." + @CLAUDE_CMD=$$(command -v claude || echo "$$HOME/.local/bin/claude"); \ + if [ -x "$$CLAUDE_CMD" ]; then \ + $$CLAUDE_CMD mcp add --transport http --scope user session-analytics http://localhost:8081/mcp 2>/dev/null && \ + echo "Added session-analytics to Claude Code" || \ + echo "session-analytics already configured in Claude Code"; \ + else \ + echo "Note: claude not found. Run manually:"; \ + echo " claude mcp add --transport http --scope user session-analytics http://localhost:8081/mcp"; \ + fi + @echo "" + @echo "Installation complete!" + @echo "" + @echo "Make sure ~/.local/bin is in your PATH:" + @echo ' export PATH="$$HOME/.local/bin:$$PATH"' + +# Uninstall: LaunchAgent + CLI + MCP config +uninstall: + @echo "Uninstalling..." + ./scripts/uninstall-launchagent.sh + @echo "" + @echo "Removing from Claude Code..." + @CLAUDE_CMD=$$(command -v claude || echo "$$HOME/.local/bin/claude"); \ + if [ -x "$$CLAUDE_CMD" ]; then \ + $$CLAUDE_CMD mcp remove --scope user session-analytics 2>/dev/null && \ + echo "Removed session-analytics from Claude Code" || \ + echo "session-analytics not found in Claude Code"; \ + fi + @echo "" + @echo "Uninstall complete!" + @echo "Note: venv and source code remain in place." diff --git a/README.md b/README.md new file mode 100644 index 0000000..b724c57 --- /dev/null +++ b/README.md @@ -0,0 +1,59 @@ +# Claude Session Analytics + +MCP server for queryable analytics on Claude Code session logs. + +## Overview + +Replaces `parse-session-logs.sh` with a persistent, queryable analytics layer. Parses JSONL session logs from `~/.claude/projects/` and provides: + +- **User-centric timeline**: Events across conversations, organized by timestamp +- **Rich querying**: Tool frequency, command breakdown, sequences, permission gaps +- **Persistent storage**: SQLite at `~/.claude/contrib/analytics/data.db` +- **Auto-refresh**: Queries automatically refresh stale data (>5 min old) +- **CLI access**: Full CLI for shell scripts and hooks + +## Installation + +```bash +make install +``` + +This will: +1. Create a virtual environment +2. Install dependencies +3. Set up a LaunchAgent for auto-start +4. Add the MCP server to Claude Code + +## Development + +```bash +make dev # Install dev dependencies +./scripts/dev.sh # Run in dev mode with auto-reload +``` + +## Commands + +```bash +make check # Run fmt, lint, test +make install # Install LaunchAgent + CLI +make uninstall # Remove LaunchAgent + CLI +``` + +## MCP Tools + +| Tool | Purpose | +|------|---------| +| `ingest_logs` | Refresh data from JSONL files | +| `query_timeline` | Events in time window | +| `query_tool_frequency` | Tool usage counts | +| `query_commands` | Bash command breakdown | +| `query_sequences` | Common tool patterns | +| `query_permission_gaps` | Commands needing settings.json | +| `query_sessions` | Session metadata | +| `query_tokens` | Token usage analysis | +| `get_insights` | Pre-computed patterns for /improve-workflow | +| `get_status` | Ingestion status + DB stats | + +## License + +MIT diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3194b72 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,54 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "claude-session-analytics" +version = "0.1.0" +description = "MCP server for queryable analytics on Claude Code session logs" +readme = "README.md" +requires-python = ">=3.10" +license = "MIT" +authors = [ + { name = "Evan Senter" } +] +keywords = ["mcp", "claude", "analytics", "session-logs"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "fastmcp>=0.1.0", + "uvicorn>=0.30.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "ruff>=0.8.0", +] + +[project.scripts] +session-analytics = "session_analytics.server:main" + +[tool.hatch.build.targets.wheel] +packages = ["src/session_analytics"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +asyncio_mode = "auto" + +[tool.ruff] +target-version = "py310" +line-length = 100 +src = ["src", "tests"] + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W", "UP"] +ignore = ["E501"] # Line length handled by formatter diff --git a/scripts/com.evansenter.claude-session-analytics.plist b/scripts/com.evansenter.claude-session-analytics.plist new file mode 100644 index 0000000..d8421b0 --- /dev/null +++ b/scripts/com.evansenter.claude-session-analytics.plist @@ -0,0 +1,41 @@ + + + + + Label + com.evansenter.claude-session-analytics + + ProgramArguments + + __VENV_PYTHON__ + -m + session_analytics.server + + + WorkingDirectory + __PROJECT_DIR__ + + EnvironmentVariables + + PATH + /opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin + PYTHONPATH + __PROJECT_DIR__/src + + + RunAtLoad + + + KeepAlive + + + StandardOutPath + __HOME__/.claude/session-analytics.log + + StandardErrorPath + __HOME__/.claude/session-analytics.err + + ProcessType + Background + + diff --git a/scripts/dev.sh b/scripts/dev.sh new file mode 100755 index 0000000..c86e1f3 --- /dev/null +++ b/scripts/dev.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Run session analytics in development mode (foreground, auto-reload, verbose logging) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +LABEL="com.evansenter.claude-session-analytics" +PLIST="$HOME/Library/LaunchAgents/$LABEL.plist" + +cd "$PROJECT_DIR" +source .venv/bin/activate + +# Stop LaunchAgent if running (to free port 8081) +LAUNCHAGENT_WAS_RUNNING=false +if launchctl list 2>/dev/null | grep -q "$LABEL"; then + echo "Stopping LaunchAgent for dev mode..." + launchctl unload "$PLIST" 2>/dev/null + LAUNCHAGENT_WAS_RUNNING=true + osascript -e 'display notification "Stopped for dev mode" with title "Session Analytics"' 2>/dev/null +fi + +# Restart LaunchAgent on exit +cleanup() { + if [[ "$LAUNCHAGENT_WAS_RUNNING" == "true" && -f "$PLIST" ]]; then + echo "" + echo "Restarting LaunchAgent..." + launchctl load "$PLIST" + osascript -e 'display notification "LaunchAgent restarted" with title "Session Analytics"' 2>/dev/null + fi +} +trap cleanup EXIT + +echo "Starting session analytics in dev mode (Ctrl+C to stop)..." +echo "Add to Claude Code: claude mcp add --transport http --scope user session-analytics http://127.0.0.1:8081/mcp" +echo "" + +# DEV_MODE enables verbose logging +DEV_MODE=1 uvicorn session_analytics.server:create_app --host 127.0.0.1 --port 8081 --reload --factory diff --git a/scripts/install-launchagent.sh b/scripts/install-launchagent.sh new file mode 100755 index 0000000..40c9398 --- /dev/null +++ b/scripts/install-launchagent.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# Install the session analytics server as a macOS LaunchAgent (auto-starts on login) + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +VENV_PYTHON="$PROJECT_DIR/.venv/bin/python" +PLIST_TEMPLATE="$SCRIPT_DIR/com.evansenter.claude-session-analytics.plist" +PLIST_DEST="$HOME/Library/LaunchAgents/com.evansenter.claude-session-analytics.plist" +LABEL="com.evansenter.claude-session-analytics" + +# Check venv exists +if [[ ! -f "$VENV_PYTHON" ]]; then + echo "Error: Virtual environment not found at $PROJECT_DIR/.venv" + echo "Run: python3 -m venv .venv && source .venv/bin/activate && pip install -e ." + exit 1 +fi + +# Create LaunchAgents directory if needed +mkdir -p "$HOME/Library/LaunchAgents" +mkdir -p "$HOME/.claude" + +# Stop existing service if running +if launchctl list | grep -q "$LABEL"; then + echo "Stopping existing service..." + launchctl unload "$PLIST_DEST" 2>/dev/null || true +fi + +# Generate plist with correct paths +echo "Installing LaunchAgent..." +sed -e "s|__VENV_PYTHON__|$VENV_PYTHON|g" \ + -e "s|__PROJECT_DIR__|$PROJECT_DIR|g" \ + -e "s|__HOME__|$HOME|g" \ + "$PLIST_TEMPLATE" > "$PLIST_DEST" + +# Load the service +echo "Starting service..." +launchctl load "$PLIST_DEST" + +# Verify it's running +sleep 1 +if launchctl list | grep -q "$LABEL"; then + echo "" + echo "Session analytics installed and running!" + echo " Logs: ~/.claude/session-analytics.log" + echo " Errors: ~/.claude/session-analytics.err" + echo "" + echo "To uninstall: $SCRIPT_DIR/uninstall-launchagent.sh" + osascript -e 'display notification "LaunchAgent installed and running" with title "Session Analytics"' 2>/dev/null +else + echo "Error: Service failed to start. Check ~/.claude/session-analytics.err" + osascript -e 'display notification "Failed to start - check logs" with title "Session Analytics" sound name "Basso"' 2>/dev/null + exit 1 +fi diff --git a/scripts/uninstall-launchagent.sh b/scripts/uninstall-launchagent.sh new file mode 100755 index 0000000..9e556ed --- /dev/null +++ b/scripts/uninstall-launchagent.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Uninstall the session analytics LaunchAgent + +set -e + +PLIST_DEST="$HOME/Library/LaunchAgents/com.evansenter.claude-session-analytics.plist" +LABEL="com.evansenter.claude-session-analytics" + +if [[ ! -f "$PLIST_DEST" ]]; then + echo "LaunchAgent not installed." + exit 0 +fi + +echo "Stopping service..." +launchctl unload "$PLIST_DEST" 2>/dev/null || true + +echo "Removing plist..." +rm -f "$PLIST_DEST" + +echo "Session analytics LaunchAgent uninstalled." + +echo "" +echo "Note: Logs remain at ~/.claude/session-analytics.log" +osascript -e 'display notification "LaunchAgent uninstalled" with title "Session Analytics"' 2>/dev/null diff --git a/src/session_analytics/__init__.py b/src/session_analytics/__init__.py index e69de29..345cbea 100644 --- a/src/session_analytics/__init__.py +++ b/src/session_analytics/__init__.py @@ -0,0 +1,3 @@ +"""Claude Session Analytics - MCP server for queryable session log analytics.""" + +__version__ = "0.1.0" diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md new file mode 100644 index 0000000..1fe271b --- /dev/null +++ b/src/session_analytics/guide.md @@ -0,0 +1,68 @@ +# Session Analytics Usage Guide + +This MCP server provides queryable analytics on Claude Code session logs. + +## Quick Start + +The server auto-refreshes data when queries detect stale data (>5 min old). +You can also manually trigger ingestion: + +``` +ingest_logs(days=7) # Process last 7 days of logs +``` + +## Available Tools + +### Ingestion + +| Tool | Purpose | +|------|---------| +| `ingest_logs` | Refresh data from JSONL files | +| `get_status` | Ingestion status + DB stats | + +### Queries + +| Tool | Purpose | +|------|---------| +| `query_timeline` | Events in time window | +| `query_tool_frequency` | Tool usage counts | +| `query_commands` | Bash command breakdown | +| `query_sequences` | Common tool patterns | +| `query_permission_gaps` | Commands needing settings.json | +| `query_sessions` | Session metadata | +| `query_tokens` | Token usage analysis | +| `get_insights` | Pre-computed patterns | + +## Common Patterns + +### Understanding tool usage + +``` +query_tool_frequency(days=30) +``` + +### Finding permission gaps + +``` +query_permission_gaps(threshold=5) # Commands used 5+ times that need permission +``` + +### Analyzing workflows + +``` +query_sequences(min_count=3, length=3) # Common 3-tool sequences +``` + +## Integration with /improve-workflow + +The `get_insights` tool returns pre-computed patterns specifically for +the `/improve-workflow` command: + +``` +get_insights(refresh=True) # Force fresh analysis +``` + +## Data Location + +- Database: `~/.claude/contrib/analytics/data.db` +- Logs parsed from: `~/.claude/projects/**/*.jsonl` diff --git a/src/session_analytics/ingest.py b/src/session_analytics/ingest.py new file mode 100644 index 0000000..6ab5ad8 --- /dev/null +++ b/src/session_analytics/ingest.py @@ -0,0 +1,452 @@ +"""JSONL log ingestion for Claude Code session analytics.""" + +import json +import logging +from datetime import datetime, timedelta +from pathlib import Path + +from session_analytics.storage import Event, IngestionState, Session, SQLiteStorage + +logger = logging.getLogger("session-analytics") + +# Default location for Claude Code session logs +DEFAULT_LOGS_DIR = Path.home() / ".claude" / "projects" + + +def find_log_files( + logs_dir: Path = DEFAULT_LOGS_DIR, + days: int = 7, + project_filter: str | None = None, +) -> list[Path]: + """Find JSONL log files within the specified time range. + + Args: + logs_dir: Directory containing project subdirectories + days: Only include files modified within this many days + project_filter: Optional project path to filter (encoded form) + + Returns: + List of JSONL file paths, sorted by modification time (newest first) + """ + if not logs_dir.exists(): + logger.warning(f"Logs directory does not exist: {logs_dir}") + return [] + + cutoff = datetime.now() - timedelta(days=days) + files = [] + + for project_dir in logs_dir.iterdir(): + if not project_dir.is_dir(): + continue + + # Apply project filter if specified + if project_filter and project_filter not in project_dir.name: + continue + + for jsonl_file in project_dir.glob("*.jsonl"): + try: + mtime = datetime.fromtimestamp(jsonl_file.stat().st_mtime) + if mtime >= cutoff: + files.append((jsonl_file, mtime)) + except OSError as e: + logger.warning(f"Could not stat {jsonl_file}: {e}") + + # Sort by modification time, newest first + files.sort(key=lambda x: x[1], reverse=True) + return [f for f, _ in files] + + +def parse_tool_use(tool_use: dict) -> dict: + """Extract normalized fields from a tool_use block. + + Returns dict with: tool_name, tool_id, tool_input_json, command, command_args, + file_path, skill_name + """ + result = { + "tool_name": tool_use.get("name"), + "tool_id": tool_use.get("id"), + "tool_input_json": json.dumps(tool_use.get("input", {})), + "command": None, + "command_args": None, + "file_path": None, + "skill_name": None, + } + + tool_input = tool_use.get("input", {}) + tool_name = result["tool_name"] + + # Extract Bash command info + if tool_name == "Bash": + cmd = tool_input.get("command", "") + if cmd: + parts = cmd.split(None, 1) + result["command"] = parts[0] if parts else None + result["command_args"] = parts[1] if len(parts) > 1 else None + + # Extract file path for file operations + elif tool_name in ("Read", "Edit", "Write", "Glob", "Grep"): + result["file_path"] = tool_input.get("file_path") or tool_input.get("path") + + # Extract skill name + elif tool_name == "Skill": + result["skill_name"] = tool_input.get("skill") + + # Handle MCP tools (e.g., mcp__event-bus__register_session) + elif tool_name and tool_name.startswith("mcp__"): + # Keep the full name for MCP tools + pass + + return result + + +def parse_entry(raw: dict, project_path: str) -> list[Event]: + """Parse a single JSONL entry into Event objects. + + An entry may produce multiple events (e.g., assistant with multiple tool_use blocks). + + Args: + raw: Parsed JSON object from JSONL + project_path: Encoded project path from directory name + + Returns: + List of Event objects (may be empty for skipped entries) + """ + entry_type = raw.get("type") + + # Skip certain entry types that don't contain useful analytics data + if entry_type in ("file-history-snapshot", "queue-operation", "create"): + return [] + + # Skip thinking/text blocks that are nested content + if entry_type in ("thinking", "text", "tool_use", "tool_result", "message"): + return [] + + uuid = raw.get("uuid") + session_id = raw.get("sessionId") + timestamp_str = raw.get("timestamp") + + # Skip entries without required fields + if not uuid or not session_id or not timestamp_str: + return [] + + try: + timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + # Convert to naive datetime (remove timezone for SQLite compatibility) + timestamp = timestamp.replace(tzinfo=None) + except (ValueError, AttributeError): + logger.debug(f"Could not parse timestamp: {timestamp_str}") + return [] + + # Extract common fields + cwd = raw.get("cwd") + git_branch = raw.get("gitBranch") + + # Extract token usage from assistant messages + message = raw.get("message", {}) + usage = message.get("usage", {}) + input_tokens = usage.get("input_tokens") + output_tokens = usage.get("output_tokens") + cache_read_tokens = usage.get("cache_read_input_tokens") + cache_creation_tokens = usage.get("cache_creation_input_tokens") + model = message.get("model") + + events = [] + + # Handle assistant entries with tool_use blocks + if entry_type == "assistant": + content = message.get("content", []) + tool_uses = [c for c in content if isinstance(c, dict) and c.get("type") == "tool_use"] + + if tool_uses: + # Create an event for each tool_use + for tool_use in tool_uses: + parsed = parse_tool_use(tool_use) + events.append( + Event( + id=None, + uuid=f"{uuid}:{parsed['tool_id']}", # Unique per tool_use + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="tool_use", + tool_name=parsed["tool_name"], + tool_input_json=parsed["tool_input_json"], + tool_id=parsed["tool_id"], + is_error=False, + command=parsed["command"], + command_args=parsed["command_args"], + file_path=parsed["file_path"], + skill_name=parsed["skill_name"], + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read_tokens, + cache_creation_tokens=cache_creation_tokens, + model=model, + git_branch=git_branch, + cwd=cwd, + ) + ) + else: + # Assistant message without tools + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="assistant", + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read_tokens, + cache_creation_tokens=cache_creation_tokens, + model=model, + git_branch=git_branch, + cwd=cwd, + ) + ) + + # Handle user entries (may contain tool_result) + elif entry_type == "user": + content = message.get("content", "") + + # Check if content is a list with tool_result blocks + if isinstance(content, list): + tool_results = [ + c for c in content if isinstance(c, dict) and c.get("type") == "tool_result" + ] + if tool_results: + for tr in tool_results: + # Check for error + is_error = tr.get("is_error", False) + events.append( + Event( + id=None, + uuid=f"{uuid}:{tr.get('tool_use_id', 'result')}", + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="tool_result", + tool_id=tr.get("tool_use_id"), + is_error=is_error, + git_branch=git_branch, + cwd=cwd, + ) + ) + else: + # User message with other content types + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="user", + git_branch=git_branch, + cwd=cwd, + ) + ) + else: + # Plain text user message + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="user", + git_branch=git_branch, + cwd=cwd, + ) + ) + + # Handle summary entries + elif entry_type == "summary": + events.append( + Event( + id=None, + uuid=uuid if uuid else f"summary:{raw.get('leafUuid', 'unknown')}", + timestamp=timestamp if timestamp else datetime.now(), + session_id=session_id if session_id else "unknown", + project_path=project_path, + entry_type="summary", + ) + ) + + return events + + +def ingest_file( + file_path: Path, + storage: SQLiteStorage, + force: bool = False, +) -> dict: + """Ingest a single JSONL file. + + Uses incremental ingestion - only processes new entries if file has changed. + + Args: + file_path: Path to JSONL file + storage: Storage instance + force: Force re-ingestion even if file hasn't changed + + Returns: + Stats dict with entries_processed, events_added, skipped + """ + file_str = str(file_path) + stat = file_path.stat() + file_size = stat.st_size + file_mtime = datetime.fromtimestamp(stat.st_mtime) + + # Check if we've already processed this file + state = storage.get_ingestion_state(file_str) + if state and not force: + # Skip if file hasn't changed + if state.file_size == file_size and state.last_modified >= file_mtime: + return {"entries_processed": 0, "events_added": 0, "skipped": True} + + # Extract project path from directory name + project_path = file_path.parent.name + + # Parse and collect events + events = [] + entries_processed = 0 + errors = 0 + + with open(file_path, encoding="utf-8") as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + + try: + raw = json.loads(line) + parsed_events = parse_entry(raw, project_path) + events.extend(parsed_events) + entries_processed += 1 + except json.JSONDecodeError as e: + logger.debug(f"JSON parse error in {file_path}:{line_num}: {e}") + errors += 1 + except Exception as e: + logger.warning(f"Error processing {file_path}:{line_num}: {e}") + errors += 1 + + # Batch insert events + events_added = storage.add_events_batch(events) if events else 0 + + # Update ingestion state + storage.update_ingestion_state( + IngestionState( + file_path=file_str, + file_size=file_size, + last_modified=file_mtime, + entries_processed=entries_processed, + last_processed=datetime.now(), + ) + ) + + return { + "entries_processed": entries_processed, + "events_added": events_added, + "skipped": False, + "errors": errors, + } + + +def update_session_stats(storage: SQLiteStorage) -> int: + """Update session statistics from ingested events. + + Returns number of sessions updated. + """ + # Query distinct sessions from events + with storage._connect() as conn: + rows = conn.execute(""" + SELECT + session_id, + project_path, + MIN(timestamp) as first_seen, + MAX(timestamp) as last_seen, + COUNT(*) as entry_count, + SUM(CASE WHEN tool_name IS NOT NULL THEN 1 ELSE 0 END) as tool_use_count, + SUM(COALESCE(input_tokens, 0)) as total_input_tokens, + SUM(COALESCE(output_tokens, 0)) as total_output_tokens, + (SELECT git_branch FROM events e2 + WHERE e2.session_id = events.session_id + ORDER BY timestamp DESC LIMIT 1) as primary_branch + FROM events + GROUP BY session_id + """).fetchall() + + count = 0 + for row in rows: + storage.upsert_session( + Session( + id=row["session_id"], + project_path=row["project_path"], + first_seen=row["first_seen"], + last_seen=row["last_seen"], + entry_count=row["entry_count"], + tool_use_count=row["tool_use_count"], + total_input_tokens=row["total_input_tokens"], + total_output_tokens=row["total_output_tokens"], + primary_branch=row["primary_branch"], + ) + ) + count += 1 + + return count + + +def ingest_logs( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, + force: bool = False, +) -> dict: + """Ingest all JSONL log files. + + Args: + storage: Storage instance + days: Number of days to look back + project: Optional project filter + force: Force re-ingestion + + Returns: + Stats dict with totals + """ + files = find_log_files(days=days, project_filter=project) + + total_entries = 0 + total_events = 0 + files_processed = 0 + files_skipped = 0 + total_errors = 0 + + for file_path in files: + try: + result = ingest_file(file_path, storage, force=force) + if result["skipped"]: + files_skipped += 1 + else: + files_processed += 1 + total_entries += result["entries_processed"] + total_events += result["events_added"] + total_errors += result.get("errors", 0) + except Exception as e: + logger.error(f"Failed to ingest {file_path}: {e}") + total_errors += 1 + + # Update session statistics + sessions_updated = update_session_stats(storage) + + return { + "files_found": len(files), + "files_processed": files_processed, + "files_skipped": files_skipped, + "entries_processed": total_entries, + "events_added": total_events, + "sessions_updated": sessions_updated, + "errors": total_errors, + } diff --git a/src/session_analytics/patterns.py b/src/session_analytics/patterns.py new file mode 100644 index 0000000..bc0e5cd --- /dev/null +++ b/src/session_analytics/patterns.py @@ -0,0 +1,369 @@ +"""Pattern detection and insight generation for session analytics.""" + +import json +import logging +from collections import Counter +from datetime import datetime, timedelta +from pathlib import Path + +from session_analytics.storage import Pattern, SQLiteStorage + +logger = logging.getLogger("session-analytics") + +# Default settings.json location +DEFAULT_SETTINGS_PATH = Path.home() / ".claude" / "settings.json" + + +def compute_tool_frequency_patterns( + storage: SQLiteStorage, + days: int = 7, +) -> list[Pattern]: + """Compute tool frequency patterns from events. + + Args: + storage: Storage instance + days: Number of days to analyze + + Returns: + List of tool frequency patterns + """ + cutoff = datetime.now() - timedelta(days=days) + now = datetime.now() + + with storage._connect() as conn: + rows = conn.execute( + """ + SELECT tool_name, COUNT(*) as count, MAX(timestamp) as last_seen + FROM events + WHERE timestamp >= ? AND tool_name IS NOT NULL + GROUP BY tool_name + ORDER BY count DESC + """, + (cutoff,), + ).fetchall() + + patterns = [] + for row in rows: + patterns.append( + Pattern( + id=None, + pattern_type="tool_frequency", + pattern_key=row["tool_name"], + count=row["count"], + last_seen=row["last_seen"], + metadata={}, + computed_at=now, + ) + ) + + return patterns + + +def compute_command_patterns( + storage: SQLiteStorage, + days: int = 7, +) -> list[Pattern]: + """Compute Bash command patterns from events. + + Args: + storage: Storage instance + days: Number of days to analyze + + Returns: + List of command patterns + """ + cutoff = datetime.now() - timedelta(days=days) + now = datetime.now() + + with storage._connect() as conn: + rows = conn.execute( + """ + SELECT command, COUNT(*) as count, MAX(timestamp) as last_seen + FROM events + WHERE timestamp >= ? AND tool_name = 'Bash' AND command IS NOT NULL + GROUP BY command + ORDER BY count DESC + """, + (cutoff,), + ).fetchall() + + patterns = [] + for row in rows: + patterns.append( + Pattern( + id=None, + pattern_type="command_frequency", + pattern_key=row["command"], + count=row["count"], + last_seen=row["last_seen"], + metadata={}, + computed_at=now, + ) + ) + + return patterns + + +def compute_sequence_patterns( + storage: SQLiteStorage, + days: int = 7, + sequence_length: int = 2, + min_count: int = 3, +) -> list[Pattern]: + """Compute tool sequence patterns (n-grams) from events. + + Args: + storage: Storage instance + days: Number of days to analyze + sequence_length: Length of sequences to detect + min_count: Minimum occurrences to include + + Returns: + List of sequence patterns + """ + cutoff = datetime.now() - timedelta(days=days) + now = datetime.now() + + with storage._connect() as conn: + # Get all tool events ordered by session and timestamp + rows = conn.execute( + """ + SELECT session_id, tool_name, timestamp + FROM events + WHERE timestamp >= ? AND tool_name IS NOT NULL + ORDER BY session_id, timestamp + """, + (cutoff,), + ).fetchall() + + # Group by session and extract sequences + sequences: Counter = Counter() + current_session = None + session_tools: list[str] = [] + + for row in rows: + if row["session_id"] != current_session: + # Process previous session + if len(session_tools) >= sequence_length: + for i in range(len(session_tools) - sequence_length + 1): + seq = tuple(session_tools[i : i + sequence_length]) + sequences[seq] += 1 + + current_session = row["session_id"] + session_tools = [] + + session_tools.append(row["tool_name"]) + + # Process last session + if len(session_tools) >= sequence_length: + for i in range(len(session_tools) - sequence_length + 1): + seq = tuple(session_tools[i : i + sequence_length]) + sequences[seq] += 1 + + # Create patterns for sequences meeting min_count + patterns = [] + for seq, count in sequences.most_common(): + if count < min_count: + break + patterns.append( + Pattern( + id=None, + pattern_type="tool_sequence", + pattern_key=" → ".join(seq), + count=count, + last_seen=now, + metadata={"sequence": list(seq)}, + computed_at=now, + ) + ) + + return patterns + + +def load_allowed_commands(settings_path: Path = DEFAULT_SETTINGS_PATH) -> set[str]: + """Load allowed commands from Claude Code settings.json. + + Args: + settings_path: Path to settings.json + + Returns: + Set of allowed command prefixes + """ + if not settings_path.exists(): + return set() + + try: + with open(settings_path) as f: + settings = json.load(f) + + allowed = set() + permissions = settings.get("permissions", {}) + + # Look for allow patterns with Bash(command:*) + for pattern in permissions.get("allow", []): + if pattern.startswith("Bash(") and pattern.endswith(":*)"): + cmd = pattern[5:-3] # Extract command from "Bash(cmd:*)" + allowed.add(cmd) + + return allowed + except (json.JSONDecodeError, OSError) as e: + logger.warning(f"Could not load settings.json: {e}") + return set() + + +def compute_permission_gaps( + storage: SQLiteStorage, + days: int = 7, + threshold: int = 5, + settings_path: Path = DEFAULT_SETTINGS_PATH, +) -> list[Pattern]: + """Find commands that are frequently used but not in settings.json. + + Args: + storage: Storage instance + days: Number of days to analyze + threshold: Minimum usage count to suggest adding + settings_path: Path to settings.json + + Returns: + List of permission gap patterns + """ + cutoff = datetime.now() - timedelta(days=days) + now = datetime.now() + + allowed_commands = load_allowed_commands(settings_path) + + with storage._connect() as conn: + rows = conn.execute( + """ + SELECT command, COUNT(*) as count + FROM events + WHERE timestamp >= ? AND tool_name = 'Bash' AND command IS NOT NULL + GROUP BY command + HAVING COUNT(*) >= ? + ORDER BY count DESC + """, + (cutoff, threshold), + ).fetchall() + + patterns = [] + for row in rows: + cmd = row["command"] + if cmd not in allowed_commands: + patterns.append( + Pattern( + id=None, + pattern_type="permission_gap", + pattern_key=cmd, + count=row["count"], + last_seen=now, + metadata={"suggestion": f"Bash({cmd}:*)"}, + computed_at=now, + ) + ) + + return patterns + + +def compute_all_patterns( + storage: SQLiteStorage, + days: int = 7, +) -> dict: + """Compute all pattern types and store them. + + Args: + storage: Storage instance + days: Number of days to analyze + + Returns: + Stats about computed patterns + """ + # Clear existing patterns + storage.clear_patterns() + + # Compute tool frequency + tool_patterns = compute_tool_frequency_patterns(storage, days=days) + for p in tool_patterns: + storage.upsert_pattern(p) + + # Compute command frequency + command_patterns = compute_command_patterns(storage, days=days) + for p in command_patterns: + storage.upsert_pattern(p) + + # Compute sequences + sequence_patterns = compute_sequence_patterns(storage, days=days) + for p in sequence_patterns: + storage.upsert_pattern(p) + + # Compute permission gaps + gap_patterns = compute_permission_gaps(storage, days=days) + for p in gap_patterns: + storage.upsert_pattern(p) + + return { + "tool_frequency_patterns": len(tool_patterns), + "command_patterns": len(command_patterns), + "sequence_patterns": len(sequence_patterns), + "permission_gap_patterns": len(gap_patterns), + "total_patterns": len(tool_patterns) + + len(command_patterns) + + len(sequence_patterns) + + len(gap_patterns), + } + + +def get_insights( + storage: SQLiteStorage, + refresh: bool = False, + days: int = 7, +) -> dict: + """Get pre-computed insights for /improve-workflow. + + Args: + storage: Storage instance + refresh: Force recomputation of patterns + days: Number of days to analyze (only used if refresh=True) + + Returns: + Insights organized by type + """ + # Check if we need to refresh + patterns = storage.get_patterns() + if not patterns or refresh: + compute_all_patterns(storage, days=days) + patterns = storage.get_patterns() + + # Organize by type + insights = { + "tool_frequency": [], + "command_frequency": [], + "sequences": [], + "permission_gaps": [], + } + + for p in patterns: + if p.pattern_type == "tool_frequency": + insights["tool_frequency"].append({"tool": p.pattern_key, "count": p.count}) + elif p.pattern_type == "command_frequency": + insights["command_frequency"].append({"command": p.pattern_key, "count": p.count}) + elif p.pattern_type == "tool_sequence": + insights["sequences"].append({"sequence": p.pattern_key, "count": p.count}) + elif p.pattern_type == "permission_gap": + insights["permission_gaps"].append( + { + "command": p.pattern_key, + "count": p.count, + "suggestion": p.metadata.get("suggestion", ""), + } + ) + + # Add summary stats + insights["summary"] = { + "total_tools": len(insights["tool_frequency"]), + "total_commands": len(insights["command_frequency"]), + "total_sequences": len(insights["sequences"]), + "permission_gaps_found": len(insights["permission_gaps"]), + } + + return insights diff --git a/src/session_analytics/queries.py b/src/session_analytics/queries.py new file mode 100644 index 0000000..51e0e09 --- /dev/null +++ b/src/session_analytics/queries.py @@ -0,0 +1,431 @@ +"""Query implementations for session analytics.""" + +from datetime import datetime, timedelta + +from session_analytics.storage import SQLiteStorage + + +def ensure_fresh_data( + storage: SQLiteStorage, + max_age_minutes: int = 5, + days: int = 7, + project: str | None = None, + force: bool = False, +) -> bool: + """Check if data is stale and refresh if needed. + + Args: + storage: Storage instance + max_age_minutes: Maximum age of data before refresh + days: Number of days to look back when refreshing + project: Optional project filter for refresh + force: Force refresh regardless of age + + Returns: + True if data was refreshed, False if data was fresh + """ + if force: + from session_analytics.ingest import ingest_logs + + ingest_logs(storage, days=days, project=project) + return True + + last_ingest = storage.get_last_ingestion_time() + if last_ingest is None or (datetime.now() - last_ingest) > timedelta(minutes=max_age_minutes): + from session_analytics.ingest import ingest_logs + + ingest_logs(storage, days=days, project=project) + return True + + return False + + +def query_tool_frequency( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, +) -> dict: + """Get tool usage frequency counts. + + Args: + storage: Storage instance + days: Number of days to analyze + project: Optional project path filter + + Returns: + Dict with tool frequency breakdown + """ + cutoff = datetime.now() - timedelta(days=days) + + with storage._connect() as conn: + conditions = ["timestamp >= ?", "tool_name IS NOT NULL"] + params: list = [cutoff] + + if project: + conditions.append("project_path LIKE ?") + params.append(f"%{project}%") + + where_clause = " AND ".join(conditions) + + # Get tool frequency counts + rows = conn.execute( + f""" + SELECT tool_name, COUNT(*) as count + FROM events + WHERE {where_clause} + GROUP BY tool_name + ORDER BY count DESC + """, + params, + ).fetchall() + + tools = [{"tool": row["tool_name"], "count": row["count"]} for row in rows] + + # Get total tool calls + total = sum(t["count"] for t in tools) + + return { + "days": days, + "project": project, + "total_tool_calls": total, + "tools": tools, + } + + +def query_timeline( + storage: SQLiteStorage, + start: datetime | None = None, + end: datetime | None = None, + tool: str | None = None, + project: str | None = None, + limit: int = 100, +) -> dict: + """Get events in a time window. + + Args: + storage: Storage instance + start: Start of time window (default: 24 hours ago) + end: End of time window (default: now) + tool: Optional tool name filter + project: Optional project path filter + limit: Maximum events to return + + Returns: + Dict with timeline events + """ + if start is None: + start = datetime.now() - timedelta(hours=24) + if end is None: + end = datetime.now() + + events = storage.get_events_in_range( + start=start, + end=end, + tool_name=tool, + project_path=project, + limit=limit, + ) + + return { + "start": start.isoformat(), + "end": end.isoformat(), + "tool": tool, + "project": project, + "count": len(events), + "events": [ + { + "timestamp": e.timestamp.isoformat(), + "session_id": e.session_id, + "entry_type": e.entry_type, + "tool_name": e.tool_name, + "command": e.command, + "file_path": e.file_path, + "skill_name": e.skill_name, + "is_error": e.is_error, + } + for e in events + ], + } + + +def query_commands( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, + prefix: str | None = None, +) -> dict: + """Get Bash command breakdown. + + Args: + storage: Storage instance + days: Number of days to analyze + project: Optional project path filter + prefix: Optional command prefix filter (e.g., "git") + + Returns: + Dict with command breakdown + """ + cutoff = datetime.now() - timedelta(days=days) + + with storage._connect() as conn: + conditions = ["timestamp >= ?", "tool_name = 'Bash'", "command IS NOT NULL"] + params: list = [cutoff] + + if project: + conditions.append("project_path LIKE ?") + params.append(f"%{project}%") + + if prefix: + conditions.append("command LIKE ?") + params.append(f"{prefix}%") + + where_clause = " AND ".join(conditions) + + # Get command frequency counts + rows = conn.execute( + f""" + SELECT command, COUNT(*) as count + FROM events + WHERE {where_clause} + GROUP BY command + ORDER BY count DESC + """, + params, + ).fetchall() + + commands = [{"command": row["command"], "count": row["count"]} for row in rows] + + # Get total Bash commands + total = sum(c["count"] for c in commands) + + return { + "days": days, + "project": project, + "prefix": prefix, + "total_commands": total, + "commands": commands, + } + + +def query_sessions( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, +) -> dict: + """Get session metadata. + + Args: + storage: Storage instance + days: Number of days to analyze + project: Optional project path filter + + Returns: + Dict with session information + """ + cutoff = datetime.now() - timedelta(days=days) + + with storage._connect() as conn: + conditions = ["last_seen >= ?"] + params: list = [cutoff] + + if project: + conditions.append("project_path LIKE ?") + params.append(f"%{project}%") + + where_clause = " AND ".join(conditions) + + rows = conn.execute( + f""" + SELECT + id, project_path, first_seen, last_seen, + entry_count, tool_use_count, + total_input_tokens, total_output_tokens, + primary_branch + FROM sessions + WHERE {where_clause} + ORDER BY last_seen DESC + """, + params, + ).fetchall() + + sessions = [ + { + "id": row["id"], + "project": row["project_path"], + "first_seen": row["first_seen"], + "last_seen": row["last_seen"], + "entry_count": row["entry_count"], + "tool_use_count": row["tool_use_count"], + "input_tokens": row["total_input_tokens"], + "output_tokens": row["total_output_tokens"], + "branch": row["primary_branch"], + } + for row in rows + ] + + # Calculate totals + total_entries = sum(s["entry_count"] for s in sessions) + total_tools = sum(s["tool_use_count"] for s in sessions) + total_input = sum(s["input_tokens"] or 0 for s in sessions) + total_output = sum(s["output_tokens"] or 0 for s in sessions) + + return { + "days": days, + "project": project, + "session_count": len(sessions), + "total_entries": total_entries, + "total_tool_uses": total_tools, + "total_input_tokens": total_input, + "total_output_tokens": total_output, + "sessions": sessions, + } + + +def query_tokens( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, + by: str = "day", +) -> dict: + """Get token usage analysis. + + Args: + storage: Storage instance + days: Number of days to analyze + project: Optional project path filter + by: Grouping: 'day', 'session', or 'model' + + Returns: + Dict with token usage breakdown + """ + cutoff = datetime.now() - timedelta(days=days) + + with storage._connect() as conn: + conditions = ["timestamp >= ?"] + params: list = [cutoff] + + if project: + conditions.append("project_path LIKE ?") + params.append(f"%{project}%") + + where_clause = " AND ".join(conditions) + + if by == "day": + # Group by day + rows = conn.execute( + f""" + SELECT + DATE(timestamp) as day, + SUM(COALESCE(input_tokens, 0)) as input_tokens, + SUM(COALESCE(output_tokens, 0)) as output_tokens, + SUM(COALESCE(cache_read_tokens, 0)) as cache_read_tokens, + SUM(COALESCE(cache_creation_tokens, 0)) as cache_creation_tokens, + COUNT(*) as event_count + FROM events + WHERE {where_clause} + GROUP BY DATE(timestamp) + ORDER BY day DESC + """, + params, + ).fetchall() + + breakdown = [ + { + "day": row["day"], + "input_tokens": row["input_tokens"], + "output_tokens": row["output_tokens"], + "cache_read_tokens": row["cache_read_tokens"], + "cache_creation_tokens": row["cache_creation_tokens"], + "event_count": row["event_count"], + } + for row in rows + ] + group_key = "day" + + elif by == "session": + # Group by session + rows = conn.execute( + f""" + SELECT + session_id, + project_path, + SUM(COALESCE(input_tokens, 0)) as input_tokens, + SUM(COALESCE(output_tokens, 0)) as output_tokens, + SUM(COALESCE(cache_read_tokens, 0)) as cache_read_tokens, + SUM(COALESCE(cache_creation_tokens, 0)) as cache_creation_tokens, + COUNT(*) as event_count + FROM events + WHERE {where_clause} + GROUP BY session_id + ORDER BY input_tokens DESC + """, + params, + ).fetchall() + + breakdown = [ + { + "session_id": row["session_id"], + "project": row["project_path"], + "input_tokens": row["input_tokens"], + "output_tokens": row["output_tokens"], + "cache_read_tokens": row["cache_read_tokens"], + "cache_creation_tokens": row["cache_creation_tokens"], + "event_count": row["event_count"], + } + for row in rows + ] + group_key = "session" + + elif by == "model": + # Group by model + rows = conn.execute( + f""" + SELECT + COALESCE(model, 'unknown') as model, + SUM(COALESCE(input_tokens, 0)) as input_tokens, + SUM(COALESCE(output_tokens, 0)) as output_tokens, + SUM(COALESCE(cache_read_tokens, 0)) as cache_read_tokens, + SUM(COALESCE(cache_creation_tokens, 0)) as cache_creation_tokens, + COUNT(*) as event_count + FROM events + WHERE {where_clause} + GROUP BY model + ORDER BY input_tokens DESC + """, + params, + ).fetchall() + + breakdown = [ + { + "model": row["model"], + "input_tokens": row["input_tokens"], + "output_tokens": row["output_tokens"], + "cache_read_tokens": row["cache_read_tokens"], + "cache_creation_tokens": row["cache_creation_tokens"], + "event_count": row["event_count"], + } + for row in rows + ] + group_key = "model" + + else: + return { + "error": f"Invalid grouping: {by}. Use 'day', 'session', or 'model'.", + } + + # Calculate totals + total_input = sum(b["input_tokens"] for b in breakdown) + total_output = sum(b["output_tokens"] for b in breakdown) + total_cache_read = sum(b["cache_read_tokens"] for b in breakdown) + total_cache_creation = sum(b["cache_creation_tokens"] for b in breakdown) + + return { + "days": days, + "project": project, + "group_by": group_key, + "total_input_tokens": total_input, + "total_output_tokens": total_output, + "total_cache_read_tokens": total_cache_read, + "total_cache_creation_tokens": total_cache_creation, + "breakdown": breakdown, + } diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py new file mode 100644 index 0000000..6750e89 --- /dev/null +++ b/src/session_analytics/server.py @@ -0,0 +1,286 @@ +"""MCP Session Analytics Server. + +Provides tools for querying Claude Code session logs: +- ingest_logs: Refresh data from JSONL files +- query_timeline: Events in time window +- query_tool_frequency: Tool usage counts +- query_commands: Bash command breakdown +- query_sequences: Common tool patterns +- query_permission_gaps: Commands needing settings.json +- query_sessions: Session metadata +- query_tokens: Token usage analysis +- get_insights: Pre-computed patterns for /improve-workflow +- get_status: Ingestion status + DB stats +""" + +import logging +import os +from pathlib import Path + +from fastmcp import FastMCP + +from session_analytics.ingest import ingest_logs as do_ingest_logs +from session_analytics.patterns import compute_permission_gaps, compute_sequence_patterns +from session_analytics.patterns import get_insights as do_get_insights +from session_analytics.queries import ensure_fresh_data +from session_analytics.queries import query_commands as do_query_commands +from session_analytics.queries import query_sessions as do_query_sessions +from session_analytics.queries import query_timeline as do_query_timeline +from session_analytics.queries import query_tokens as do_query_tokens +from session_analytics.queries import query_tool_frequency as do_query_tool_frequency +from session_analytics.storage import SQLiteStorage + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger("session-analytics") +if os.environ.get("DEV_MODE"): + logger.setLevel(logging.DEBUG) + +# Initialize MCP server +mcp = FastMCP("session-analytics") + +# Initialize storage +storage = SQLiteStorage() + + +@mcp.resource("session-analytics://guide", description="Usage guide and best practices") +def usage_guide() -> str: + """Return the session analytics usage guide from external markdown file.""" + guide_path = Path(__file__).parent / "guide.md" + try: + return guide_path.read_text() + except FileNotFoundError: + return "# Session Analytics Usage Guide\n\nGuide file not found. See CLAUDE.md for usage." + + +@mcp.tool() +def get_status() -> dict: + """Get ingestion status and database stats. + + Returns: + Status info including last ingestion time, event count, and DB size + """ + stats = storage.get_db_stats() + last_ingest = storage.get_last_ingestion_time() + + return { + "status": "ok", + "version": "0.1.0", + "last_ingestion": last_ingest.isoformat() if last_ingest else None, + **stats, + } + + +@mcp.tool() +def ingest_logs(days: int = 7, project: str | None = None, force: bool = False) -> dict: + """Refresh data from JSONL session log files. + + Args: + days: Number of days to look back (default: 7) + project: Optional project path filter + force: Force re-ingestion even if data is fresh + + Returns: + Ingestion stats (files processed, entries added, etc.) + """ + result = do_ingest_logs(storage, days=days, project=project, force=force) + return { + "status": "ok", + **result, + } + + +@mcp.tool() +def query_tool_frequency(days: int = 7, project: str | None = None) -> dict: + """Get tool usage frequency counts. + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + + Returns: + Tool frequency breakdown + """ + ensure_fresh_data(storage, days=days, project=project) + result = do_query_tool_frequency(storage, days=days, project=project) + return {"status": "ok", **result} + + +@mcp.tool() +def query_timeline( + start: str | None = None, + end: str | None = None, + tool: str | None = None, + project: str | None = None, + limit: int = 100, +) -> dict: + """Get events in a time window. + + Args: + start: Start time (ISO format, default: 24 hours ago) + end: End time (ISO format, default: now) + tool: Optional tool name filter + project: Optional project path filter + limit: Maximum events to return (default: 100) + + Returns: + Timeline of events + """ + from datetime import datetime + + start_dt = datetime.fromisoformat(start) if start else None + end_dt = datetime.fromisoformat(end) if end else None + + ensure_fresh_data(storage) + result = do_query_timeline( + storage, start=start_dt, end=end_dt, tool=tool, project=project, limit=limit + ) + return {"status": "ok", **result} + + +@mcp.tool() +def query_commands(days: int = 7, project: str | None = None, prefix: str | None = None) -> dict: + """Get Bash command breakdown. + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + prefix: Optional command prefix filter (e.g., "git") + + Returns: + Command frequency breakdown + """ + ensure_fresh_data(storage, days=days, project=project) + result = do_query_commands(storage, days=days, project=project, prefix=prefix) + return {"status": "ok", **result} + + +@mcp.tool() +def query_sessions(days: int = 7, project: str | None = None) -> dict: + """Get session metadata. + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + + Returns: + Session information + """ + ensure_fresh_data(storage, days=days, project=project) + result = do_query_sessions(storage, days=days, project=project) + return {"status": "ok", **result} + + +@mcp.tool() +def query_tokens(days: int = 7, project: str | None = None, by: str = "day") -> dict: + """Get token usage analysis. + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + by: Grouping: 'day', 'session', or 'model' (default: 'day') + + Returns: + Token usage breakdown + """ + ensure_fresh_data(storage, days=days, project=project) + result = do_query_tokens(storage, days=days, project=project, by=by) + return {"status": "ok", **result} + + +@mcp.tool() +def query_sequences(days: int = 7, min_count: int = 3, length: int = 2) -> dict: + """Get common tool patterns (sequences). + + Args: + days: Number of days to analyze (default: 7) + min_count: Minimum occurrences to include (default: 3) + length: Sequence length (default: 2) + + Returns: + Common tool sequences + """ + ensure_fresh_data(storage, days=days) + patterns = compute_sequence_patterns( + storage, days=days, sequence_length=length, min_count=min_count + ) + return { + "status": "ok", + "days": days, + "min_count": min_count, + "sequence_length": length, + "sequences": [{"pattern": p.pattern_key, "count": p.count} for p in patterns], + } + + +@mcp.tool() +def query_permission_gaps(days: int = 7, threshold: int = 5) -> dict: + """Find commands that may need to be added to settings.json. + + Args: + days: Number of days to analyze (default: 7) + threshold: Minimum usage count to suggest (default: 5) + + Returns: + Commands that are frequently used but not in allowed list + """ + ensure_fresh_data(storage, days=days) + patterns = compute_permission_gaps(storage, days=days, threshold=threshold) + return { + "status": "ok", + "days": days, + "threshold": threshold, + "gaps": [ + { + "command": p.pattern_key, + "count": p.count, + "suggestion": p.metadata.get("suggestion", ""), + } + for p in patterns + ], + } + + +@mcp.tool() +def get_insights(refresh: bool = False, days: int = 7) -> dict: + """Get pre-computed patterns for /improve-workflow. + + Args: + refresh: Force recomputation of patterns (default: False) + days: Number of days to analyze if refreshing (default: 7) + + Returns: + Insights organized by type (tool_frequency, sequences, permission_gaps) + """ + ensure_fresh_data(storage, days=days) + result = do_get_insights(storage, refresh=refresh, days=days) + return {"status": "ok", **result} + + +def create_app(): + """Create the ASGI app for uvicorn.""" + # stateless_http=True allows resilience to server restarts + return mcp.http_app(stateless_http=True) + + +def main(): + """Run the MCP server.""" + import uvicorn + + port = int(os.environ.get("PORT", 8081)) + host = os.environ.get("HOST", "127.0.0.1") + + print(f"Starting Claude Session Analytics on {host}:{port}") + print( + f"Add to Claude Code: claude mcp add --transport http --scope user session-analytics http://{host}:{port}/mcp" + ) + + uvicorn.run(create_app(), host=host, port=port) + + +if __name__ == "__main__": + main() diff --git a/src/session_analytics/storage.py b/src/session_analytics/storage.py new file mode 100644 index 0000000..0fa58e3 --- /dev/null +++ b/src/session_analytics/storage.py @@ -0,0 +1,588 @@ +"""SQLite storage backend for session analytics.""" + +import json +import logging +import os +import sqlite3 +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger("session-analytics") + +# Register datetime adapters/converters (required for Python 3.12+) + + +def _adapt_datetime(dt: datetime) -> str: + """Convert datetime to ISO format string for SQLite storage.""" + return dt.isoformat() + + +def _convert_datetime(data: bytes) -> datetime: + """Convert ISO format string from SQLite to datetime.""" + return datetime.fromisoformat(data.decode()) + + +sqlite3.register_adapter(datetime, _adapt_datetime) +sqlite3.register_converter("TIMESTAMP", _convert_datetime) + + +@dataclass +class Event: + """A parsed event from a Claude Code session log.""" + + id: int | None + uuid: str + timestamp: datetime + session_id: str + project_path: str | None = None + entry_type: str | None = None # 'user', 'assistant', 'summary' + + # Tool-specific (null if not a tool call) + tool_name: str | None = None + tool_input_json: str | None = None + tool_id: str | None = None + is_error: bool = False + + # Denormalized for common filters + command: str | None = None # Bash: first word + command_args: str | None = None # Bash: remaining args + file_path: str | None = None # Read/Edit/Write target + skill_name: str | None = None # Skill invocation + + # Token tracking + input_tokens: int | None = None + output_tokens: int | None = None + cache_read_tokens: int | None = None + cache_creation_tokens: int | None = None + model: str | None = None + + # Context + git_branch: str | None = None + cwd: str | None = None + + +@dataclass +class Session: + """Metadata about a Claude Code session.""" + + id: str + project_path: str | None = None + first_seen: datetime | None = None + last_seen: datetime | None = None + entry_count: int = 0 + tool_use_count: int = 0 + total_input_tokens: int = 0 + total_output_tokens: int = 0 + primary_branch: str | None = None + slug: str | None = None + + +@dataclass +class IngestionState: + """Tracks the ingestion state of a JSONL file.""" + + file_path: str + file_size: int + last_modified: datetime + entries_processed: int + last_processed: datetime + + +@dataclass +class Pattern: + """A pre-computed pattern for fast querying.""" + + id: int | None + pattern_type: str # 'tool_frequency', 'sequence', 'permission_gap', etc. + pattern_key: str # e.g., "Bash" or "Read → Edit" + count: int = 0 + last_seen: datetime | None = None + metadata: dict = field(default_factory=dict) + computed_at: datetime | None = None + + +# Default database path +DEFAULT_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" + +# Schema version for migrations +SCHEMA_VERSION = 1 + + +class SQLiteStorage: + """SQLite-backed storage for session analytics.""" + + def __init__(self, db_path: str | Path | None = None): + """Initialize storage with optional custom DB path.""" + if db_path is None: + db_path = os.environ.get("SESSION_ANALYTICS_DB", str(DEFAULT_DB_PATH)) + + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + self._init_db() + + @contextmanager + def _connect(self): + """Context manager for database connections.""" + conn = sqlite3.connect( + self.db_path, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + ) + conn.row_factory = sqlite3.Row + try: + yield conn + conn.commit() + finally: + conn.close() + + def _init_db(self): + """Create tables if they don't exist.""" + with self._connect() as conn: + # Schema version tracking + conn.execute(""" + CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY + ) + """) + + # Core events table (denormalized for fast queries) + conn.execute(""" + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY, + uuid TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + session_id TEXT NOT NULL, + project_path TEXT, + entry_type TEXT, + + -- Tool-specific + tool_name TEXT, + tool_input_json TEXT, + tool_id TEXT, + is_error INTEGER DEFAULT 0, + + -- Denormalized for common filters + command TEXT, + command_args TEXT, + file_path TEXT, + skill_name TEXT, + + -- Token tracking + input_tokens INTEGER, + output_tokens INTEGER, + cache_read_tokens INTEGER, + cache_creation_tokens INTEGER, + model TEXT, + + -- Context + git_branch TEXT, + cwd TEXT, + + UNIQUE(session_id, uuid) + ) + """) + + # Indexes for common queries + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_tool ON events(tool_name)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_project ON events(project_path)") + + # Sessions metadata + conn.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + project_path TEXT, + first_seen TIMESTAMP, + last_seen TIMESTAMP, + entry_count INTEGER DEFAULT 0, + tool_use_count INTEGER DEFAULT 0, + total_input_tokens INTEGER DEFAULT 0, + total_output_tokens INTEGER DEFAULT 0, + primary_branch TEXT, + slug TEXT + ) + """) + + # Ingestion tracking (incremental updates) + conn.execute(""" + CREATE TABLE IF NOT EXISTS ingestion_state ( + file_path TEXT PRIMARY KEY, + file_size INTEGER, + last_modified TIMESTAMP, + entries_processed INTEGER, + last_processed TIMESTAMP + ) + """) + + # Pre-computed patterns + conn.execute(""" + CREATE TABLE IF NOT EXISTS patterns ( + id INTEGER PRIMARY KEY, + pattern_type TEXT NOT NULL, + pattern_key TEXT NOT NULL, + count INTEGER DEFAULT 0, + last_seen TIMESTAMP, + metadata_json TEXT, + computed_at TIMESTAMP, + UNIQUE(pattern_type, pattern_key) + ) + """) + + # Set schema version + conn.execute( + "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,) + ) + + # Event operations + + def add_event(self, event: Event) -> Event: + """Add a new event and return it with assigned ID.""" + with self._connect() as conn: + cursor = conn.execute( + """ + INSERT OR IGNORE INTO events ( + uuid, timestamp, session_id, project_path, entry_type, + tool_name, tool_input_json, tool_id, is_error, + command, command_args, file_path, skill_name, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, + git_branch, cwd + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + event.uuid, + event.timestamp, + event.session_id, + event.project_path, + event.entry_type, + event.tool_name, + event.tool_input_json, + event.tool_id, + 1 if event.is_error else 0, + event.command, + event.command_args, + event.file_path, + event.skill_name, + event.input_tokens, + event.output_tokens, + event.cache_read_tokens, + event.cache_creation_tokens, + event.model, + event.git_branch, + event.cwd, + ), + ) + event.id = cursor.lastrowid + return event + + def add_events_batch(self, events: list[Event]) -> int: + """Add multiple events in a single transaction. Returns count added.""" + with self._connect() as conn: + cursor = conn.executemany( + """ + INSERT OR IGNORE INTO events ( + uuid, timestamp, session_id, project_path, entry_type, + tool_name, tool_input_json, tool_id, is_error, + command, command_args, file_path, skill_name, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, + git_branch, cwd + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + ( + e.uuid, + e.timestamp, + e.session_id, + e.project_path, + e.entry_type, + e.tool_name, + e.tool_input_json, + e.tool_id, + 1 if e.is_error else 0, + e.command, + e.command_args, + e.file_path, + e.skill_name, + e.input_tokens, + e.output_tokens, + e.cache_read_tokens, + e.cache_creation_tokens, + e.model, + e.git_branch, + e.cwd, + ) + for e in events + ], + ) + return cursor.rowcount + + def get_event_count(self) -> int: + """Get total number of events.""" + with self._connect() as conn: + row = conn.execute("SELECT COUNT(*) as count FROM events").fetchone() + return row["count"] + + def get_events_in_range( + self, + start: datetime | None = None, + end: datetime | None = None, + tool_name: str | None = None, + project_path: str | None = None, + limit: int = 100, + ) -> list[Event]: + """Get events within a time range with optional filters.""" + with self._connect() as conn: + conditions = [] + params: list = [] + + if start: + conditions.append("timestamp >= ?") + params.append(start) + if end: + conditions.append("timestamp <= ?") + params.append(end) + if tool_name: + conditions.append("tool_name = ?") + params.append(tool_name) + if project_path: + conditions.append("project_path = ?") + params.append(project_path) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + params.append(limit) + + rows = conn.execute( + f""" + SELECT * FROM events + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT ? + """, + params, + ).fetchall() + + return [self._row_to_event(row) for row in rows] + + def _row_to_event(self, row: sqlite3.Row) -> Event: + """Convert a database row to an Event object.""" + return Event( + id=row["id"], + uuid=row["uuid"], + timestamp=row["timestamp"], + session_id=row["session_id"], + project_path=row["project_path"], + entry_type=row["entry_type"], + tool_name=row["tool_name"], + tool_input_json=row["tool_input_json"], + tool_id=row["tool_id"], + is_error=bool(row["is_error"]), + command=row["command"], + command_args=row["command_args"], + file_path=row["file_path"], + skill_name=row["skill_name"], + input_tokens=row["input_tokens"], + output_tokens=row["output_tokens"], + cache_read_tokens=row["cache_read_tokens"], + cache_creation_tokens=row["cache_creation_tokens"], + model=row["model"], + git_branch=row["git_branch"], + cwd=row["cwd"], + ) + + # Session operations + + def upsert_session(self, session: Session) -> None: + """Add or update a session.""" + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO sessions ( + id, project_path, first_seen, last_seen, + entry_count, tool_use_count, + total_input_tokens, total_output_tokens, + primary_branch, slug + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + session.id, + session.project_path, + session.first_seen, + session.last_seen, + session.entry_count, + session.tool_use_count, + session.total_input_tokens, + session.total_output_tokens, + session.primary_branch, + session.slug, + ), + ) + + def get_session(self, session_id: str) -> Session | None: + """Get a session by ID.""" + with self._connect() as conn: + row = conn.execute("SELECT * FROM sessions WHERE id = ?", (session_id,)).fetchone() + if row: + return self._row_to_session(row) + return None + + def get_session_count(self) -> int: + """Get total number of sessions.""" + with self._connect() as conn: + row = conn.execute("SELECT COUNT(*) as count FROM sessions").fetchone() + return row["count"] + + def _row_to_session(self, row: sqlite3.Row) -> Session: + """Convert a database row to a Session object.""" + return Session( + id=row["id"], + project_path=row["project_path"], + first_seen=row["first_seen"], + last_seen=row["last_seen"], + entry_count=row["entry_count"], + tool_use_count=row["tool_use_count"], + total_input_tokens=row["total_input_tokens"], + total_output_tokens=row["total_output_tokens"], + primary_branch=row["primary_branch"], + slug=row["slug"], + ) + + # Ingestion state operations + + def get_ingestion_state(self, file_path: str) -> IngestionState | None: + """Get ingestion state for a file.""" + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM ingestion_state WHERE file_path = ?", (file_path,) + ).fetchone() + if row: + return IngestionState( + file_path=row["file_path"], + file_size=row["file_size"], + last_modified=row["last_modified"], + entries_processed=row["entries_processed"], + last_processed=row["last_processed"], + ) + return None + + def update_ingestion_state(self, state: IngestionState) -> None: + """Update ingestion state for a file.""" + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO ingestion_state ( + file_path, file_size, last_modified, entries_processed, last_processed + ) VALUES (?, ?, ?, ?, ?) + """, + ( + state.file_path, + state.file_size, + state.last_modified, + state.entries_processed, + state.last_processed, + ), + ) + + def get_last_ingestion_time(self) -> datetime | None: + """Get the most recent ingestion time across all files.""" + with self._connect() as conn: + row = conn.execute("SELECT MAX(last_processed) as last FROM ingestion_state").fetchone() + if not row or not row["last"]: + return None + # Handle both datetime objects and ISO strings (SQLite aggregates return strings) + val = row["last"] + return datetime.fromisoformat(val) if isinstance(val, str) else val + + # Pattern operations + + def upsert_pattern(self, pattern: Pattern) -> None: + """Add or update a pattern.""" + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO patterns ( + pattern_type, pattern_key, count, last_seen, metadata_json, computed_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, + ( + pattern.pattern_type, + pattern.pattern_key, + pattern.count, + pattern.last_seen, + json.dumps(pattern.metadata) if pattern.metadata else None, + pattern.computed_at, + ), + ) + + def get_patterns(self, pattern_type: str | None = None) -> list[Pattern]: + """Get patterns, optionally filtered by type.""" + with self._connect() as conn: + if pattern_type: + rows = conn.execute( + "SELECT * FROM patterns WHERE pattern_type = ? ORDER BY count DESC", + (pattern_type,), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM patterns ORDER BY pattern_type, count DESC" + ).fetchall() + + return [ + Pattern( + id=row["id"], + pattern_type=row["pattern_type"], + pattern_key=row["pattern_key"], + count=row["count"], + last_seen=row["last_seen"], + metadata=json.loads(row["metadata_json"]) if row["metadata_json"] else {}, + computed_at=row["computed_at"], + ) + for row in rows + ] + + def clear_patterns(self, pattern_type: str | None = None) -> int: + """Clear patterns, optionally filtered by type. Returns count deleted.""" + with self._connect() as conn: + if pattern_type: + cursor = conn.execute( + "DELETE FROM patterns WHERE pattern_type = ?", (pattern_type,) + ) + else: + cursor = conn.execute("DELETE FROM patterns") + return cursor.rowcount + + # Utility operations + + def get_db_stats(self) -> dict: + """Get database statistics.""" + with self._connect() as conn: + event_count = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] + session_count = conn.execute("SELECT COUNT(*) FROM sessions").fetchone()[0] + pattern_count = conn.execute("SELECT COUNT(*) FROM patterns").fetchone()[0] + file_count = conn.execute("SELECT COUNT(*) FROM ingestion_state").fetchone()[0] + + # Get date range + date_range = conn.execute( + "SELECT MIN(timestamp) as min_ts, MAX(timestamp) as max_ts FROM events" + ).fetchone() + + # Get DB file size + db_size = self.db_path.stat().st_size if self.db_path.exists() else 0 + + # Helper to convert datetime or string to ISO string + def to_iso(val): + if val is None: + return None + return val if isinstance(val, str) else val.isoformat() + + return { + "event_count": event_count, + "session_count": session_count, + "pattern_count": pattern_count, + "files_processed": file_count, + "earliest_event": to_iso(date_range["min_ts"]), + "latest_event": to_iso(date_range["max_ts"]), + "db_size_bytes": db_size, + "db_path": str(self.db_path), + } diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..b76b24c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for claude-session-analytics.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..83cca08 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,25 @@ +"""Pytest configuration and fixtures.""" + +import pytest + + +@pytest.fixture +def sample_session_log_entry(): + """Sample JSONL entry from a Claude Code session log.""" + return { + "uuid": "test-uuid-12345", + "timestamp": "2025-01-01T12:00:00.000Z", + "sessionId": "session-abc123", + "type": "assistant", + "message": { + "role": "assistant", + "content": [ + { + "type": "tool_use", + "id": "tool-123", + "name": "Bash", + "input": {"command": "git status", "description": "Check git status"}, + } + ], + }, + } diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..b2503b5 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,316 @@ +"""Tests for the JSONL ingestion module.""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from session_analytics.ingest import ( + find_log_files, + ingest_file, + parse_entry, + parse_tool_use, +) +from session_analytics.storage import SQLiteStorage + + +@pytest.fixture +def storage(): + """Create a temporary storage instance for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + yield SQLiteStorage(db_path) + + +@pytest.fixture +def sample_logs_dir(): + """Create a temporary directory with sample JSONL files.""" + with tempfile.TemporaryDirectory() as tmpdir: + logs_dir = Path(tmpdir) + project_dir = logs_dir / "-test-project" + project_dir.mkdir() + + # Create a sample JSONL file + jsonl_file = project_dir / "test-session.jsonl" + entries = [ + { + "type": "user", + "uuid": "user-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "cwd": "/test/project", + "gitBranch": "main", + "message": {"role": "user", "content": "Hello"}, + }, + { + "type": "assistant", + "uuid": "assistant-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:05.000Z", + "cwd": "/test/project", + "gitBranch": "main", + "message": { + "role": "assistant", + "model": "claude-opus-4-5-20251101", + "content": [ + { + "type": "tool_use", + "id": "tool-1", + "name": "Bash", + "input": {"command": "git status"}, + } + ], + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 1000, + }, + }, + }, + { + "type": "user", + "uuid": "result-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:10.000Z", + "cwd": "/test/project", + "gitBranch": "main", + "message": { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool-1", + "content": "On branch main", + } + ], + }, + }, + ] + + with open(jsonl_file, "w") as f: + for entry in entries: + f.write(json.dumps(entry) + "\n") + + yield logs_dir + + +class TestParseToolUse: + """Tests for tool_use parsing.""" + + def test_parse_bash_command(self): + """Test extracting command from Bash tool.""" + tool_use = { + "name": "Bash", + "id": "tool-1", + "input": {"command": "git status --short"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "Bash" + assert result["command"] == "git" + assert result["command_args"] == "status --short" + + def test_parse_read_file(self): + """Test extracting file_path from Read tool.""" + tool_use = { + "name": "Read", + "id": "tool-2", + "input": {"file_path": "/path/to/file.py"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "Read" + assert result["file_path"] == "/path/to/file.py" + + def test_parse_skill(self): + """Test extracting skill_name from Skill tool.""" + tool_use = { + "name": "Skill", + "id": "tool-3", + "input": {"skill": "commit"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "Skill" + assert result["skill_name"] == "commit" + + def test_parse_mcp_tool(self): + """Test parsing MCP tool names.""" + tool_use = { + "name": "mcp__event-bus__register_session", + "id": "tool-4", + "input": {"name": "test"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "mcp__event-bus__register_session" + + +class TestParseEntry: + """Tests for entry parsing.""" + + def test_parse_user_message(self): + """Test parsing a user message.""" + entry = { + "type": "user", + "uuid": "user-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "cwd": "/test", + "gitBranch": "main", + "message": {"role": "user", "content": "Hello"}, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "user" + assert events[0].session_id == "session-1" + + def test_parse_assistant_with_tool(self): + """Test parsing an assistant message with tool_use.""" + entry = { + "type": "assistant", + "uuid": "assistant-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "model": "claude-opus-4-5", + "content": [ + { + "type": "tool_use", + "id": "tool-1", + "name": "Bash", + "input": {"command": "ls -la"}, + } + ], + "usage": {"input_tokens": 100, "output_tokens": 50}, + }, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "tool_use" + assert events[0].tool_name == "Bash" + assert events[0].command == "ls" + assert events[0].input_tokens == 100 + + def test_parse_tool_result(self): + """Test parsing a tool_result entry.""" + entry = { + "type": "user", + "uuid": "result-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool-1", + "content": "output", + } + ], + }, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "tool_result" + assert events[0].tool_id == "tool-1" + + def test_skip_file_history_snapshot(self): + """Test that file-history-snapshot entries are skipped.""" + entry = { + "type": "file-history-snapshot", + "uuid": "snapshot-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + } + events = parse_entry(entry, "test-project") + assert len(events) == 0 + + def test_skip_malformed_entry(self): + """Test that entries without required fields are skipped.""" + entry = {"type": "user"} # Missing uuid, sessionId, timestamp + events = parse_entry(entry, "test-project") + assert len(events) == 0 + + +class TestIngestFile: + """Tests for file ingestion.""" + + def test_ingest_file(self, storage, sample_logs_dir): + """Test ingesting a JSONL file.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + result = ingest_file(jsonl_file, storage) + assert result["entries_processed"] == 3 + assert result["events_added"] == 3 + assert result["skipped"] is False + + def test_incremental_ingestion(self, storage, sample_logs_dir): + """Test that unchanged files are skipped on re-ingestion.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + # First ingestion + result1 = ingest_file(jsonl_file, storage) + assert result1["skipped"] is False + + # Second ingestion should skip + result2 = ingest_file(jsonl_file, storage) + assert result2["skipped"] is True + + def test_force_reingestion(self, storage, sample_logs_dir): + """Test force re-ingestion.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + # First ingestion + ingest_file(jsonl_file, storage) + + # Force re-ingestion should process again + result = ingest_file(jsonl_file, storage, force=True) + assert result["skipped"] is False + + +class TestFindLogFiles: + """Tests for log file discovery.""" + + def test_find_log_files(self, sample_logs_dir): + """Test finding JSONL files in logs directory.""" + files = find_log_files(logs_dir=sample_logs_dir, days=7) + assert len(files) == 1 + assert files[0].suffix == ".jsonl" + + def test_filter_by_project(self, sample_logs_dir): + """Test filtering by project name.""" + # Create another project + other_project = sample_logs_dir / "-other-project" + other_project.mkdir() + (other_project / "other.jsonl").write_text('{"type":"user"}\n') + + # Should find both + all_files = find_log_files(logs_dir=sample_logs_dir, days=7) + assert len(all_files) == 2 + + # Should only find matching project + filtered = find_log_files(logs_dir=sample_logs_dir, days=7, project_filter="test") + assert len(filtered) == 1 + assert "test" in str(filtered[0]) + + +class TestIngestLogs: + """Tests for full ingestion flow.""" + + def test_ingest_logs(self, storage, sample_logs_dir): + """Test full ingestion flow.""" + # Use find_log_files with explicit logs_dir + from session_analytics.ingest import ingest_file as do_ingest_file + from session_analytics.ingest import update_session_stats + + files = find_log_files(logs_dir=sample_logs_dir, days=7) + assert len(files) == 1 + + # Ingest the file + result = do_ingest_file(files[0], storage) + assert result["events_added"] == 3 + + # Update session stats + sessions = update_session_stats(storage) + assert sessions >= 1 diff --git a/tests/test_patterns.py b/tests/test_patterns.py new file mode 100644 index 0000000..0163271 --- /dev/null +++ b/tests/test_patterns.py @@ -0,0 +1,309 @@ +"""Tests for the pattern detection module.""" + +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +import pytest + +from session_analytics.patterns import ( + compute_all_patterns, + compute_command_patterns, + compute_permission_gaps, + compute_sequence_patterns, + compute_tool_frequency_patterns, + get_insights, + load_allowed_commands, +) +from session_analytics.storage import Event, SQLiteStorage + + +@pytest.fixture +def storage(): + """Create a temporary storage instance for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + yield SQLiteStorage(db_path) + + +@pytest.fixture +def populated_storage(storage): + """Create a storage instance with sample data for pattern detection.""" + now = datetime.now() + + # Add events that will create patterns + events = [ + # Session 1: Read -> Edit -> Bash sequence + Event( + id=None, + uuid="e1", + timestamp=now - timedelta(hours=1), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Read", + ), + Event( + id=None, + uuid="e2", + timestamp=now - timedelta(hours=1, minutes=-1), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Edit", + ), + Event( + id=None, + uuid="e3", + timestamp=now - timedelta(hours=1, minutes=-2), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + command="git", + ), + # Session 2: Read -> Edit sequence (same as s1) + Event( + id=None, + uuid="e4", + timestamp=now - timedelta(hours=2), + session_id="s2", + project_path="-test", + entry_type="tool_use", + tool_name="Read", + ), + Event( + id=None, + uuid="e5", + timestamp=now - timedelta(hours=2, minutes=-1), + session_id="s2", + project_path="-test", + entry_type="tool_use", + tool_name="Edit", + ), + # Session 3: Read -> Edit sequence (third occurrence) + Event( + id=None, + uuid="e6", + timestamp=now - timedelta(hours=3), + session_id="s3", + project_path="-test", + entry_type="tool_use", + tool_name="Read", + ), + Event( + id=None, + uuid="e7", + timestamp=now - timedelta(hours=3, minutes=-1), + session_id="s3", + project_path="-test", + entry_type="tool_use", + tool_name="Edit", + ), + # More Bash commands for permission gap testing + Event( + id=None, + uuid="e8", + timestamp=now - timedelta(hours=4), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + command="make", + ), + Event( + id=None, + uuid="e9", + timestamp=now - timedelta(hours=4, minutes=-1), + session_id="s2", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + command="make", + ), + Event( + id=None, + uuid="e10", + timestamp=now - timedelta(hours=4, minutes=-2), + session_id="s3", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + command="make", + ), + Event( + id=None, + uuid="e11", + timestamp=now - timedelta(hours=4, minutes=-3), + session_id="s1", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + command="make", + ), + Event( + id=None, + uuid="e12", + timestamp=now - timedelta(hours=4, minutes=-4), + session_id="s2", + project_path="-test", + entry_type="tool_use", + tool_name="Bash", + command="make", + ), + ] + + storage.add_events_batch(events) + return storage + + +class TestToolFrequencyPatterns: + """Tests for tool frequency pattern detection.""" + + def test_compute_tool_frequency(self, populated_storage): + """Test computing tool frequency patterns.""" + patterns = compute_tool_frequency_patterns(populated_storage, days=7) + + # Should have patterns for Read, Edit, Bash + pattern_keys = {p.pattern_key for p in patterns} + assert "Read" in pattern_keys + assert "Edit" in pattern_keys + assert "Bash" in pattern_keys + + def test_frequency_counts(self, populated_storage): + """Test that frequency counts are accurate.""" + patterns = compute_tool_frequency_patterns(populated_storage, days=7) + pattern_dict = {p.pattern_key: p.count for p in patterns} + + assert pattern_dict["Read"] == 3 + assert pattern_dict["Edit"] == 3 + assert pattern_dict["Bash"] == 6 # 1 git + 5 make + + +class TestCommandPatterns: + """Tests for command pattern detection.""" + + def test_compute_command_patterns(self, populated_storage): + """Test computing command patterns.""" + patterns = compute_command_patterns(populated_storage, days=7) + + pattern_dict = {p.pattern_key: p.count for p in patterns} + assert pattern_dict.get("git", 0) == 1 + assert pattern_dict.get("make", 0) == 5 + + +class TestSequencePatterns: + """Tests for sequence pattern detection.""" + + def test_compute_sequences(self, populated_storage): + """Test computing sequence patterns.""" + patterns = compute_sequence_patterns( + populated_storage, days=7, sequence_length=2, min_count=2 + ) + + # Should find Read -> Edit pattern (occurs 3 times) + pattern_keys = {p.pattern_key for p in patterns} + assert "Read → Edit" in pattern_keys + + def test_sequence_counts(self, populated_storage): + """Test that sequence counts are accurate.""" + patterns = compute_sequence_patterns( + populated_storage, days=7, sequence_length=2, min_count=1 + ) + + pattern_dict = {p.pattern_key: p.count for p in patterns} + assert pattern_dict["Read → Edit"] == 3 + + def test_min_count_filter(self, populated_storage): + """Test that min_count filter works.""" + # With min_count=5, should have no sequences + patterns = compute_sequence_patterns( + populated_storage, days=7, sequence_length=2, min_count=5 + ) + assert len(patterns) == 0 + + +class TestPermissionGaps: + """Tests for permission gap detection.""" + + def test_load_allowed_commands_missing_file(self): + """Test loading allowed commands from non-existent file.""" + with tempfile.TemporaryDirectory() as tmpdir: + missing_path = Path(tmpdir) / "nonexistent.json" + allowed = load_allowed_commands(missing_path) + assert allowed == set() + + def test_load_allowed_commands(self): + """Test loading allowed commands from settings.json.""" + with tempfile.TemporaryDirectory() as tmpdir: + settings_path = Path(tmpdir) / "settings.json" + settings_path.write_text( + '{"permissions": {"allow": ["Bash(git:*)", "Bash(make:*)"]}}' + ) + allowed = load_allowed_commands(settings_path) + assert "git" in allowed + assert "make" in allowed + + def test_compute_permission_gaps(self, populated_storage): + """Test computing permission gaps.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create empty settings.json + settings_path = Path(tmpdir) / "settings.json" + settings_path.write_text('{"permissions": {"allow": []}}') + + patterns = compute_permission_gaps( + populated_storage, days=7, threshold=3, settings_path=settings_path + ) + + # Should find make (5 uses) but maybe not git (1 use) depending on threshold + pattern_keys = {p.pattern_key for p in patterns} + assert "make" in pattern_keys + + def test_permission_gaps_respects_allowed(self, populated_storage): + """Test that allowed commands are not reported as gaps.""" + with tempfile.TemporaryDirectory() as tmpdir: + settings_path = Path(tmpdir) / "settings.json" + settings_path.write_text('{"permissions": {"allow": ["Bash(make:*)"]}}') + + patterns = compute_permission_gaps( + populated_storage, days=7, threshold=1, settings_path=settings_path + ) + + # make is allowed, so should only find git + pattern_keys = {p.pattern_key for p in patterns} + assert "make" not in pattern_keys + assert "git" in pattern_keys + + +class TestComputeAllPatterns: + """Tests for computing all patterns.""" + + def test_compute_all_patterns(self, populated_storage): + """Test computing all pattern types.""" + stats = compute_all_patterns(populated_storage, days=7) + + assert stats["tool_frequency_patterns"] > 0 + assert stats["command_patterns"] > 0 + assert stats["total_patterns"] > 0 + + +class TestGetInsights: + """Tests for the get_insights function.""" + + def test_get_insights(self, populated_storage): + """Test getting insights.""" + insights = get_insights(populated_storage, refresh=True, days=7) + + assert "tool_frequency" in insights + assert "command_frequency" in insights + assert "sequences" in insights + assert "permission_gaps" in insights + assert "summary" in insights + + def test_insights_summary(self, populated_storage): + """Test that insights include summary stats.""" + insights = get_insights(populated_storage, refresh=True, days=7) + + assert "total_tools" in insights["summary"] + assert "total_commands" in insights["summary"] + assert "total_sequences" in insights["summary"] diff --git a/tests/test_queries.py b/tests/test_queries.py new file mode 100644 index 0000000..c5a68db --- /dev/null +++ b/tests/test_queries.py @@ -0,0 +1,302 @@ +"""Tests for the query implementations.""" + +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +import pytest + +from session_analytics.queries import ( + ensure_fresh_data, + query_commands, + query_sessions, + query_timeline, + query_tokens, + query_tool_frequency, +) +from session_analytics.storage import Event, Session, SQLiteStorage + + +@pytest.fixture +def storage(): + """Create a temporary storage instance for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + yield SQLiteStorage(db_path) + + +@pytest.fixture +def populated_storage(storage): + """Create a storage instance with sample data.""" + now = datetime.now() + + # Add some events + events = [ + Event( + id=None, + uuid="event-1", + timestamp=now - timedelta(hours=1), + session_id="session-1", + project_path="-test-project", + entry_type="tool_use", + tool_name="Bash", + command="git", + command_args="status", + input_tokens=100, + output_tokens=50, + model="claude-opus-4-5", + ), + Event( + id=None, + uuid="event-2", + timestamp=now - timedelta(hours=2), + session_id="session-1", + project_path="-test-project", + entry_type="tool_use", + tool_name="Read", + file_path="/path/to/file.py", + input_tokens=80, + output_tokens=30, + model="claude-opus-4-5", + ), + Event( + id=None, + uuid="event-3", + timestamp=now - timedelta(hours=3), + session_id="session-1", + project_path="-test-project", + entry_type="tool_use", + tool_name="Bash", + command="git", + command_args="diff", + input_tokens=120, + output_tokens=60, + model="claude-opus-4-5", + ), + Event( + id=None, + uuid="event-4", + timestamp=now - timedelta(hours=4), + session_id="session-2", + project_path="-other-project", + entry_type="tool_use", + tool_name="Edit", + file_path="/path/to/other.py", + input_tokens=200, + output_tokens=100, + model="claude-sonnet-4-20250514", + ), + Event( + id=None, + uuid="event-5", + timestamp=now - timedelta(days=10), + session_id="session-3", + project_path="-old-project", + entry_type="tool_use", + tool_name="Bash", + command="make", + input_tokens=50, + output_tokens=25, + model="claude-opus-4-5", + ), + ] + storage.add_events_batch(events) + + # Add sessions + storage.upsert_session( + Session( + id="session-1", + project_path="-test-project", + first_seen=now - timedelta(hours=3), + last_seen=now - timedelta(hours=1), + entry_count=3, + tool_use_count=3, + total_input_tokens=300, + total_output_tokens=140, + primary_branch="main", + ) + ) + storage.upsert_session( + Session( + id="session-2", + project_path="-other-project", + first_seen=now - timedelta(hours=4), + last_seen=now - timedelta(hours=4), + entry_count=1, + tool_use_count=1, + total_input_tokens=200, + total_output_tokens=100, + primary_branch="feature", + ) + ) + + return storage + + +class TestQueryToolFrequency: + """Tests for tool frequency queries.""" + + def test_basic_frequency(self, populated_storage): + """Test basic tool frequency query.""" + result = query_tool_frequency(populated_storage, days=7) + assert result["total_tool_calls"] == 4 # 5 events, but 1 is 10 days old + assert len(result["tools"]) > 0 + + # Check that Bash is most frequent + tools = {t["tool"]: t["count"] for t in result["tools"]} + assert tools.get("Bash", 0) == 2 + assert tools.get("Read", 0) == 1 + assert tools.get("Edit", 0) == 1 + + def test_frequency_with_project_filter(self, populated_storage): + """Test tool frequency with project filter.""" + result = query_tool_frequency(populated_storage, days=7, project="test") + assert result["project"] == "test" + # Should only include test-project events + assert result["total_tool_calls"] == 3 + + def test_frequency_days_filter(self, populated_storage): + """Test that days filter works.""" + result = query_tool_frequency(populated_storage, days=30) + assert result["total_tool_calls"] == 5 # All events including old one + + +class TestQueryTimeline: + """Tests for timeline queries.""" + + def test_basic_timeline(self, populated_storage): + """Test basic timeline query.""" + result = query_timeline(populated_storage, limit=10) + assert "events" in result + assert len(result["events"]) <= 10 + + def test_timeline_with_tool_filter(self, populated_storage): + """Test timeline with tool filter.""" + result = query_timeline(populated_storage, tool="Bash", limit=10) + for event in result["events"]: + assert event["tool_name"] == "Bash" + + def test_timeline_with_time_range(self, populated_storage): + """Test timeline with time range.""" + now = datetime.now() + start = now - timedelta(hours=2) + end = now + + result = query_timeline(populated_storage, start=start, end=end, limit=10) + # Should only include events within range + for event in result["events"]: + ts = datetime.fromisoformat(event["timestamp"]) + assert ts >= start + assert ts <= end + + +class TestQueryCommands: + """Tests for command queries.""" + + def test_basic_commands(self, populated_storage): + """Test basic command query.""" + result = query_commands(populated_storage, days=7) + assert result["total_commands"] >= 2 # At least 2 git commands + + # Check that git is present + commands = {c["command"]: c["count"] for c in result["commands"]} + assert "git" in commands + assert commands["git"] == 2 + + def test_commands_with_prefix(self, populated_storage): + """Test command query with prefix filter.""" + result = query_commands(populated_storage, days=7, prefix="gi") + # Should only include git commands + for cmd in result["commands"]: + assert cmd["command"].startswith("gi") + + def test_commands_with_project_filter(self, populated_storage): + """Test command query with project filter.""" + result = query_commands(populated_storage, days=7, project="test") + assert result["project"] == "test" + + +class TestQuerySessions: + """Tests for session queries.""" + + def test_basic_sessions(self, populated_storage): + """Test basic session query.""" + result = query_sessions(populated_storage, days=7) + assert result["session_count"] == 2 # 2 sessions within 7 days + assert len(result["sessions"]) == 2 + + def test_sessions_with_project_filter(self, populated_storage): + """Test session query with project filter.""" + result = query_sessions(populated_storage, days=7, project="test") + # Should only include test-project session + assert result["session_count"] == 1 + assert result["sessions"][0]["project"] == "-test-project" + + def test_session_totals(self, populated_storage): + """Test session totals calculation.""" + result = query_sessions(populated_storage, days=7) + assert result["total_entries"] == 4 # 3 + 1 + assert result["total_tool_uses"] == 4 # 3 + 1 + assert result["total_input_tokens"] == 500 # 300 + 200 + assert result["total_output_tokens"] == 240 # 140 + 100 + + +class TestQueryTokens: + """Tests for token queries.""" + + def test_tokens_by_day(self, populated_storage): + """Test token query grouped by day.""" + result = query_tokens(populated_storage, days=7, by="day") + assert result["group_by"] == "day" + assert "breakdown" in result + assert result["total_input_tokens"] >= 0 + assert result["total_output_tokens"] >= 0 + + def test_tokens_by_session(self, populated_storage): + """Test token query grouped by session.""" + result = query_tokens(populated_storage, days=7, by="session") + assert result["group_by"] == "session" + # Should have entries for each session + assert len(result["breakdown"]) >= 1 + + def test_tokens_by_model(self, populated_storage): + """Test token query grouped by model.""" + result = query_tokens(populated_storage, days=7, by="model") + assert result["group_by"] == "model" + + # Should have entries for each model + models = {b["model"] for b in result["breakdown"]} + assert "claude-opus-4-5" in models + + def test_tokens_invalid_grouping(self, populated_storage): + """Test token query with invalid grouping.""" + result = query_tokens(populated_storage, days=7, by="invalid") + assert "error" in result + + +class TestEnsureFreshData: + """Tests for data freshness checking.""" + + def test_fresh_data_not_refreshed(self, populated_storage): + """Test that fresh data is not refreshed.""" + # First, update ingestion state to make data appear fresh + from session_analytics.storage import IngestionState + + populated_storage.update_ingestion_state( + IngestionState( + file_path="/test/file.jsonl", + file_size=1000, + last_modified=datetime.now(), + entries_processed=10, + last_processed=datetime.now(), + ) + ) + + # Data should be fresh + refreshed = ensure_fresh_data(populated_storage, max_age_minutes=5) + assert not refreshed + + def test_force_refresh(self, populated_storage): + """Test that force=True always refreshes.""" + refreshed = ensure_fresh_data(populated_storage, force=True) + assert refreshed diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..8cfe684 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,111 @@ +"""Tests for the MCP server.""" + +from session_analytics.server import ( + get_insights, + get_status, + ingest_logs, + query_commands, + query_permission_gaps, + query_sequences, + query_sessions, + query_timeline, + query_tokens, + query_tool_frequency, +) + + +def test_get_status(): + """Test that get_status returns expected fields.""" + # FastMCP wraps functions - access the underlying fn + result = get_status.fn() + assert result["status"] == "ok" + assert "version" in result + assert "db_path" in result + assert "event_count" in result + assert "session_count" in result + + +def test_ingest_logs(): + """Test that ingest_logs runs and returns stats.""" + result = ingest_logs.fn(days=1) + assert result["status"] == "ok" + assert "files_found" in result + assert "events_added" in result + + +def test_query_tool_frequency(): + """Test that query_tool_frequency returns tool counts.""" + result = query_tool_frequency.fn(days=7) + assert result["status"] == "ok" + assert "days" in result + assert "total_tool_calls" in result + assert "tools" in result + assert isinstance(result["tools"], list) + + +def test_query_timeline(): + """Test that query_timeline returns events.""" + result = query_timeline.fn(limit=10) + assert result["status"] == "ok" + assert "start" in result + assert "end" in result + assert "events" in result + assert isinstance(result["events"], list) + + +def test_query_commands(): + """Test that query_commands returns command counts.""" + result = query_commands.fn(days=7) + assert result["status"] == "ok" + assert "days" in result + assert "total_commands" in result + assert "commands" in result + assert isinstance(result["commands"], list) + + +def test_query_sessions(): + """Test that query_sessions returns session info.""" + result = query_sessions.fn(days=7) + assert result["status"] == "ok" + assert "days" in result + assert "session_count" in result + assert "sessions" in result + assert isinstance(result["sessions"], list) + + +def test_query_tokens(): + """Test that query_tokens returns token breakdown.""" + result = query_tokens.fn(days=7, by="day") + assert result["status"] == "ok" + assert "days" in result + assert "group_by" in result + assert "breakdown" in result + assert isinstance(result["breakdown"], list) + + +def test_query_sequences(): + """Test that query_sequences returns sequence patterns.""" + result = query_sequences.fn(days=7, min_count=1, length=2) + assert result["status"] == "ok" + assert "days" in result + assert "sequences" in result + assert isinstance(result["sequences"], list) + + +def test_query_permission_gaps(): + """Test that query_permission_gaps returns gap analysis.""" + result = query_permission_gaps.fn(days=7, threshold=1) + assert result["status"] == "ok" + assert "days" in result + assert "gaps" in result + assert isinstance(result["gaps"], list) + + +def test_get_insights(): + """Test that get_insights returns organized patterns.""" + result = get_insights.fn(refresh=True, days=7) + assert result["status"] == "ok" + assert "tool_frequency" in result + assert "sequences" in result + assert "permission_gaps" in result + assert "summary" in result diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..9c8519f --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,267 @@ +"""Tests for the SQLite storage layer.""" + +import tempfile +from datetime import datetime +from pathlib import Path + +import pytest + +from session_analytics.storage import ( + Event, + IngestionState, + Pattern, + Session, + SQLiteStorage, +) + + +@pytest.fixture +def storage(): + """Create a temporary storage instance for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + yield SQLiteStorage(db_path) + + +@pytest.fixture +def sample_event(): + """Create a sample event for testing.""" + return Event( + id=None, + uuid="test-uuid-12345", + timestamp=datetime(2025, 1, 1, 12, 0, 0), + session_id="session-abc123", + project_path="/encoded/project/path", + entry_type="assistant", + tool_name="Bash", + tool_input_json='{"command": "git status"}', + tool_id="tool-123", + is_error=False, + command="git", + command_args="status", + ) + + +class TestEventOperations: + """Tests for event CRUD operations.""" + + def test_add_event(self, storage, sample_event): + """Test adding a single event.""" + result = storage.add_event(sample_event) + assert result.id is not None + assert result.uuid == sample_event.uuid + + def test_add_event_dedup(self, storage, sample_event): + """Test that duplicate events are ignored.""" + storage.add_event(sample_event) + storage.add_event(sample_event) # Same uuid + session_id + assert storage.get_event_count() == 1 + + def test_add_events_batch(self, storage): + """Test adding multiple events in batch.""" + events = [ + Event( + id=None, + uuid=f"uuid-{i}", + timestamp=datetime(2025, 1, 1, 12, i, 0), + session_id="session-1", + ) + for i in range(5) + ] + count = storage.add_events_batch(events) + assert count == 5 + assert storage.get_event_count() == 5 + + def test_get_events_in_range(self, storage): + """Test filtering events by time range.""" + # Add events across different times + for i in range(5): + storage.add_event( + Event( + id=None, + uuid=f"uuid-{i}", + timestamp=datetime(2025, 1, i + 1, 12, 0, 0), + session_id="session-1", + ) + ) + + # Query a subset (start/end are inclusive, events are at 12:00) + events = storage.get_events_in_range( + start=datetime(2025, 1, 2, 0, 0, 0), + end=datetime(2025, 1, 4, 23, 59, 59), + ) + assert len(events) == 3 + + def test_get_events_by_tool(self, storage): + """Test filtering events by tool name.""" + storage.add_event( + Event( + id=None, + uuid="uuid-1", + timestamp=datetime.now(), + session_id="s1", + tool_name="Bash", + ) + ) + storage.add_event( + Event( + id=None, + uuid="uuid-2", + timestamp=datetime.now(), + session_id="s1", + tool_name="Read", + ) + ) + + bash_events = storage.get_events_in_range(tool_name="Bash") + assert len(bash_events) == 1 + assert bash_events[0].tool_name == "Bash" + + +class TestSessionOperations: + """Tests for session CRUD operations.""" + + def test_upsert_session(self, storage): + """Test adding and updating a session.""" + session = Session( + id="session-1", + project_path="/test/project", + first_seen=datetime(2025, 1, 1), + last_seen=datetime(2025, 1, 1), + entry_count=10, + ) + storage.upsert_session(session) + + retrieved = storage.get_session("session-1") + assert retrieved is not None + assert retrieved.entry_count == 10 + + # Update + session.entry_count = 20 + storage.upsert_session(session) + + retrieved = storage.get_session("session-1") + assert retrieved.entry_count == 20 + + def test_get_session_count(self, storage): + """Test counting sessions.""" + for i in range(3): + storage.upsert_session(Session(id=f"session-{i}")) + assert storage.get_session_count() == 3 + + +class TestIngestionState: + """Tests for ingestion state tracking.""" + + def test_update_and_get_ingestion_state(self, storage): + """Test tracking file ingestion state.""" + state = IngestionState( + file_path="/path/to/file.jsonl", + file_size=1024, + last_modified=datetime(2025, 1, 1), + entries_processed=100, + last_processed=datetime(2025, 1, 1, 12, 0), + ) + storage.update_ingestion_state(state) + + retrieved = storage.get_ingestion_state("/path/to/file.jsonl") + assert retrieved is not None + assert retrieved.file_size == 1024 + assert retrieved.entries_processed == 100 + + def test_get_last_ingestion_time(self, storage): + """Test getting most recent ingestion time.""" + storage.update_ingestion_state( + IngestionState( + file_path="/file1.jsonl", + file_size=100, + last_modified=datetime(2025, 1, 1), + entries_processed=10, + last_processed=datetime(2025, 1, 1, 10, 0), + ) + ) + storage.update_ingestion_state( + IngestionState( + file_path="/file2.jsonl", + file_size=200, + last_modified=datetime(2025, 1, 2), + entries_processed=20, + last_processed=datetime(2025, 1, 2, 10, 0), # More recent + ) + ) + + last_time = storage.get_last_ingestion_time() + assert last_time == datetime(2025, 1, 2, 10, 0) + + +class TestPatternOperations: + """Tests for pattern CRUD operations.""" + + def test_upsert_pattern(self, storage): + """Test adding and updating patterns.""" + pattern = Pattern( + id=None, + pattern_type="tool_frequency", + pattern_key="Bash", + count=100, + last_seen=datetime(2025, 1, 1), + metadata={"avg_duration": 1.5}, + ) + storage.upsert_pattern(pattern) + + patterns = storage.get_patterns("tool_frequency") + assert len(patterns) == 1 + assert patterns[0].count == 100 + assert patterns[0].metadata["avg_duration"] == 1.5 + + def test_get_patterns_by_type(self, storage): + """Test filtering patterns by type.""" + storage.upsert_pattern( + Pattern(id=None, pattern_type="tool_frequency", pattern_key="Bash", count=50) + ) + storage.upsert_pattern( + Pattern(id=None, pattern_type="sequence", pattern_key="Read→Edit", count=30) + ) + + tool_patterns = storage.get_patterns("tool_frequency") + assert len(tool_patterns) == 1 + + all_patterns = storage.get_patterns() + assert len(all_patterns) == 2 + + def test_clear_patterns(self, storage): + """Test clearing patterns.""" + storage.upsert_pattern( + Pattern(id=None, pattern_type="tool_frequency", pattern_key="Bash", count=50) + ) + storage.upsert_pattern( + Pattern(id=None, pattern_type="sequence", pattern_key="Read→Edit", count=30) + ) + + # Clear just one type + deleted = storage.clear_patterns("tool_frequency") + assert deleted == 1 + assert len(storage.get_patterns()) == 1 + + # Clear all + storage.upsert_pattern( + Pattern(id=None, pattern_type="tool_frequency", pattern_key="Read", count=40) + ) + deleted = storage.clear_patterns() + assert deleted == 2 + + +class TestDbStats: + """Tests for database statistics.""" + + def test_get_db_stats(self, storage, sample_event): + """Test getting database statistics.""" + storage.add_event(sample_event) + storage.upsert_session(Session(id="session-1")) + storage.upsert_pattern(Pattern(id=None, pattern_type="test", pattern_key="key", count=1)) + + stats = storage.get_db_stats() + assert stats["event_count"] == 1 + assert stats["session_count"] == 1 + assert stats["pattern_count"] == 1 + assert stats["db_path"] is not None