From a600dc127b65251a4cbc7d12780561c2b5c3d346 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Wed, 31 Dec 2025 04:14:03 +0000 Subject: [PATCH 1/3] Add Phase 1: Project setup and FastMCP server skeleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pyproject.toml with FastMCP, uvicorn, and dev dependencies - Makefile with check, fmt, lint, test, install, uninstall targets - LaunchAgent plist and install/uninstall scripts for auto-start - dev.sh script for development mode with auto-reload - Basic FastMCP server with placeholder tools: - get_status: Returns server status - ingest_logs: Placeholder for log ingestion - query_tool_frequency: Placeholder for frequency queries - Usage guide as MCP resource at session-analytics://guide - Tests for the placeholder tools - README with installation and usage instructions Server runs on port 8081 (to not conflict with event-bus on 8080). đŸ¤– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .claude/settings.local.json | 18 +++ Makefile | 73 ++++++++++ README.md | 59 ++++++++ pyproject.toml | 54 ++++++++ ....evansenter.claude-session-analytics.plist | 41 ++++++ scripts/dev.sh | 37 +++++ scripts/install-launchagent.sh | 55 ++++++++ scripts/uninstall-launchagent.sh | 24 ++++ src/session_analytics/__init__.py | 3 + src/session_analytics/guide.md | 68 ++++++++++ src/session_analytics/server.py | 126 ++++++++++++++++++ tests/__init__.py | 1 + tests/conftest.py | 25 ++++ tests/test_server.py | 27 ++++ 14 files changed, 611 insertions(+) create mode 100644 .claude/settings.local.json create mode 100644 Makefile create mode 100644 README.md create mode 100644 pyproject.toml create mode 100644 scripts/com.evansenter.claude-session-analytics.plist create mode 100755 scripts/dev.sh create mode 100755 scripts/install-launchagent.sh create mode 100755 scripts/uninstall-launchagent.sh create mode 100644 src/session_analytics/guide.md create mode 100644 src/session_analytics/server.py create mode 100644 tests/conftest.py create mode 100644 tests/test_server.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..65a0653 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,18 @@ +{ + "permissions": { + "allow": [ + "Bash(chmod:*)", + "Bash(python3 -m venv:*)", + "Bash(.venv/bin/pip install:*)", + "Bash(brew list:*)", + "Bash(/opt/homebrew/bin/python3.12:*)", + "Bash(.venv/bin/ruff format:*)", + "Bash(.venv/bin/ruff check .)", + "Bash(.venv/bin/pytest tests/ -v)", + "Bash(./scripts/install-launchagent.sh:*)", + "Bash(claude mcp add:*)", + "Bash(curl:*)", + "Bash(cat:*)" + ] + } +} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a191f7c --- /dev/null +++ b/Makefile @@ -0,0 +1,73 @@ +.PHONY: check fmt lint test clean install uninstall dev venv + +# Run all quality gates (format check, lint, tests) +check: fmt lint test + +# Check/fix formatting with ruff +fmt: + ruff format --check . + +# Run linter with ruff +lint: + ruff check . + +# Run tests +test: + pytest tests/ -v + +# Clean build artifacts +clean: + rm -rf build/ dist/ *.egg-info .pytest_cache .ruff_cache + find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true + +# Create virtual environment (requires Python 3.10+) +venv: + @if [ ! -d .venv ]; then \ + echo "Creating virtual environment..."; \ + PYTHON=$$(command -v python3.12 || command -v python3.11 || command -v python3.10 || echo "python3"); \ + $$PYTHON -m venv .venv && .venv/bin/pip install --upgrade pip; \ + fi + +# Install with dev dependencies (for development) +dev: venv + .venv/bin/pip install -e ".[dev]" + +# Full installation: venv + deps + LaunchAgent + CLI + MCP +install: venv + @echo "Installing dependencies..." + .venv/bin/pip install -e . + @echo "" + @echo "Installing LaunchAgent..." + ./scripts/install-launchagent.sh + @echo "" + @echo "Adding to Claude Code..." + @CLAUDE_CMD=$$(command -v claude || echo "$$HOME/.local/bin/claude"); \ + if [ -x "$$CLAUDE_CMD" ]; then \ + $$CLAUDE_CMD mcp add --transport http --scope user session-analytics http://localhost:8081/mcp 2>/dev/null && \ + echo "Added session-analytics to Claude Code" || \ + echo "session-analytics already configured in Claude Code"; \ + else \ + echo "Note: claude not found. Run manually:"; \ + echo " claude mcp add --transport http --scope user session-analytics http://localhost:8081/mcp"; \ + fi + @echo "" + @echo "Installation complete!" + @echo "" + @echo "Make sure ~/.local/bin is in your PATH:" + @echo ' export PATH="$$HOME/.local/bin:$$PATH"' + +# Uninstall: LaunchAgent + CLI + MCP config +uninstall: + @echo "Uninstalling..." + ./scripts/uninstall-launchagent.sh + @echo "" + @echo "Removing from Claude Code..." + @CLAUDE_CMD=$$(command -v claude || echo "$$HOME/.local/bin/claude"); \ + if [ -x "$$CLAUDE_CMD" ]; then \ + $$CLAUDE_CMD mcp remove --scope user session-analytics 2>/dev/null && \ + echo "Removed session-analytics from Claude Code" || \ + echo "session-analytics not found in Claude Code"; \ + fi + @echo "" + @echo "Uninstall complete!" + @echo "Note: venv and source code remain in place." diff --git a/README.md b/README.md new file mode 100644 index 0000000..b724c57 --- /dev/null +++ b/README.md @@ -0,0 +1,59 @@ +# Claude Session Analytics + +MCP server for queryable analytics on Claude Code session logs. + +## Overview + +Replaces `parse-session-logs.sh` with a persistent, queryable analytics layer. Parses JSONL session logs from `~/.claude/projects/` and provides: + +- **User-centric timeline**: Events across conversations, organized by timestamp +- **Rich querying**: Tool frequency, command breakdown, sequences, permission gaps +- **Persistent storage**: SQLite at `~/.claude/contrib/analytics/data.db` +- **Auto-refresh**: Queries automatically refresh stale data (>5 min old) +- **CLI access**: Full CLI for shell scripts and hooks + +## Installation + +```bash +make install +``` + +This will: +1. Create a virtual environment +2. Install dependencies +3. Set up a LaunchAgent for auto-start +4. Add the MCP server to Claude Code + +## Development + +```bash +make dev # Install dev dependencies +./scripts/dev.sh # Run in dev mode with auto-reload +``` + +## Commands + +```bash +make check # Run fmt, lint, test +make install # Install LaunchAgent + CLI +make uninstall # Remove LaunchAgent + CLI +``` + +## MCP Tools + +| Tool | Purpose | +|------|---------| +| `ingest_logs` | Refresh data from JSONL files | +| `query_timeline` | Events in time window | +| `query_tool_frequency` | Tool usage counts | +| `query_commands` | Bash command breakdown | +| `query_sequences` | Common tool patterns | +| `query_permission_gaps` | Commands needing settings.json | +| `query_sessions` | Session metadata | +| `query_tokens` | Token usage analysis | +| `get_insights` | Pre-computed patterns for /improve-workflow | +| `get_status` | Ingestion status + DB stats | + +## License + +MIT diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3194b72 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,54 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "claude-session-analytics" +version = "0.1.0" +description = "MCP server for queryable analytics on Claude Code session logs" +readme = "README.md" +requires-python = ">=3.10" +license = "MIT" +authors = [ + { name = "Evan Senter" } +] +keywords = ["mcp", "claude", "analytics", "session-logs"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "fastmcp>=0.1.0", + "uvicorn>=0.30.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "ruff>=0.8.0", +] + +[project.scripts] +session-analytics = "session_analytics.server:main" + +[tool.hatch.build.targets.wheel] +packages = ["src/session_analytics"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +asyncio_mode = "auto" + +[tool.ruff] +target-version = "py310" +line-length = 100 +src = ["src", "tests"] + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W", "UP"] +ignore = ["E501"] # Line length handled by formatter diff --git a/scripts/com.evansenter.claude-session-analytics.plist b/scripts/com.evansenter.claude-session-analytics.plist new file mode 100644 index 0000000..d8421b0 --- /dev/null +++ b/scripts/com.evansenter.claude-session-analytics.plist @@ -0,0 +1,41 @@ + + + + + Label + com.evansenter.claude-session-analytics + + ProgramArguments + + __VENV_PYTHON__ + -m + session_analytics.server + + + WorkingDirectory + __PROJECT_DIR__ + + EnvironmentVariables + + PATH + /opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin + PYTHONPATH + __PROJECT_DIR__/src + + + RunAtLoad + + + KeepAlive + + + StandardOutPath + __HOME__/.claude/session-analytics.log + + StandardErrorPath + __HOME__/.claude/session-analytics.err + + ProcessType + Background + + diff --git a/scripts/dev.sh b/scripts/dev.sh new file mode 100755 index 0000000..c86e1f3 --- /dev/null +++ b/scripts/dev.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Run session analytics in development mode (foreground, auto-reload, verbose logging) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +LABEL="com.evansenter.claude-session-analytics" +PLIST="$HOME/Library/LaunchAgents/$LABEL.plist" + +cd "$PROJECT_DIR" +source .venv/bin/activate + +# Stop LaunchAgent if running (to free port 8081) +LAUNCHAGENT_WAS_RUNNING=false +if launchctl list 2>/dev/null | grep -q "$LABEL"; then + echo "Stopping LaunchAgent for dev mode..." + launchctl unload "$PLIST" 2>/dev/null + LAUNCHAGENT_WAS_RUNNING=true + osascript -e 'display notification "Stopped for dev mode" with title "Session Analytics"' 2>/dev/null +fi + +# Restart LaunchAgent on exit +cleanup() { + if [[ "$LAUNCHAGENT_WAS_RUNNING" == "true" && -f "$PLIST" ]]; then + echo "" + echo "Restarting LaunchAgent..." + launchctl load "$PLIST" + osascript -e 'display notification "LaunchAgent restarted" with title "Session Analytics"' 2>/dev/null + fi +} +trap cleanup EXIT + +echo "Starting session analytics in dev mode (Ctrl+C to stop)..." +echo "Add to Claude Code: claude mcp add --transport http --scope user session-analytics http://127.0.0.1:8081/mcp" +echo "" + +# DEV_MODE enables verbose logging +DEV_MODE=1 uvicorn session_analytics.server:create_app --host 127.0.0.1 --port 8081 --reload --factory diff --git a/scripts/install-launchagent.sh b/scripts/install-launchagent.sh new file mode 100755 index 0000000..40c9398 --- /dev/null +++ b/scripts/install-launchagent.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# Install the session analytics server as a macOS LaunchAgent (auto-starts on login) + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +VENV_PYTHON="$PROJECT_DIR/.venv/bin/python" +PLIST_TEMPLATE="$SCRIPT_DIR/com.evansenter.claude-session-analytics.plist" +PLIST_DEST="$HOME/Library/LaunchAgents/com.evansenter.claude-session-analytics.plist" +LABEL="com.evansenter.claude-session-analytics" + +# Check venv exists +if [[ ! -f "$VENV_PYTHON" ]]; then + echo "Error: Virtual environment not found at $PROJECT_DIR/.venv" + echo "Run: python3 -m venv .venv && source .venv/bin/activate && pip install -e ." + exit 1 +fi + +# Create LaunchAgents directory if needed +mkdir -p "$HOME/Library/LaunchAgents" +mkdir -p "$HOME/.claude" + +# Stop existing service if running +if launchctl list | grep -q "$LABEL"; then + echo "Stopping existing service..." + launchctl unload "$PLIST_DEST" 2>/dev/null || true +fi + +# Generate plist with correct paths +echo "Installing LaunchAgent..." +sed -e "s|__VENV_PYTHON__|$VENV_PYTHON|g" \ + -e "s|__PROJECT_DIR__|$PROJECT_DIR|g" \ + -e "s|__HOME__|$HOME|g" \ + "$PLIST_TEMPLATE" > "$PLIST_DEST" + +# Load the service +echo "Starting service..." +launchctl load "$PLIST_DEST" + +# Verify it's running +sleep 1 +if launchctl list | grep -q "$LABEL"; then + echo "" + echo "Session analytics installed and running!" + echo " Logs: ~/.claude/session-analytics.log" + echo " Errors: ~/.claude/session-analytics.err" + echo "" + echo "To uninstall: $SCRIPT_DIR/uninstall-launchagent.sh" + osascript -e 'display notification "LaunchAgent installed and running" with title "Session Analytics"' 2>/dev/null +else + echo "Error: Service failed to start. Check ~/.claude/session-analytics.err" + osascript -e 'display notification "Failed to start - check logs" with title "Session Analytics" sound name "Basso"' 2>/dev/null + exit 1 +fi diff --git a/scripts/uninstall-launchagent.sh b/scripts/uninstall-launchagent.sh new file mode 100755 index 0000000..9e556ed --- /dev/null +++ b/scripts/uninstall-launchagent.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Uninstall the session analytics LaunchAgent + +set -e + +PLIST_DEST="$HOME/Library/LaunchAgents/com.evansenter.claude-session-analytics.plist" +LABEL="com.evansenter.claude-session-analytics" + +if [[ ! -f "$PLIST_DEST" ]]; then + echo "LaunchAgent not installed." + exit 0 +fi + +echo "Stopping service..." +launchctl unload "$PLIST_DEST" 2>/dev/null || true + +echo "Removing plist..." +rm -f "$PLIST_DEST" + +echo "Session analytics LaunchAgent uninstalled." + +echo "" +echo "Note: Logs remain at ~/.claude/session-analytics.log" +osascript -e 'display notification "LaunchAgent uninstalled" with title "Session Analytics"' 2>/dev/null diff --git a/src/session_analytics/__init__.py b/src/session_analytics/__init__.py index e69de29..345cbea 100644 --- a/src/session_analytics/__init__.py +++ b/src/session_analytics/__init__.py @@ -0,0 +1,3 @@ +"""Claude Session Analytics - MCP server for queryable session log analytics.""" + +__version__ = "0.1.0" diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md new file mode 100644 index 0000000..1fe271b --- /dev/null +++ b/src/session_analytics/guide.md @@ -0,0 +1,68 @@ +# Session Analytics Usage Guide + +This MCP server provides queryable analytics on Claude Code session logs. + +## Quick Start + +The server auto-refreshes data when queries detect stale data (>5 min old). +You can also manually trigger ingestion: + +``` +ingest_logs(days=7) # Process last 7 days of logs +``` + +## Available Tools + +### Ingestion + +| Tool | Purpose | +|------|---------| +| `ingest_logs` | Refresh data from JSONL files | +| `get_status` | Ingestion status + DB stats | + +### Queries + +| Tool | Purpose | +|------|---------| +| `query_timeline` | Events in time window | +| `query_tool_frequency` | Tool usage counts | +| `query_commands` | Bash command breakdown | +| `query_sequences` | Common tool patterns | +| `query_permission_gaps` | Commands needing settings.json | +| `query_sessions` | Session metadata | +| `query_tokens` | Token usage analysis | +| `get_insights` | Pre-computed patterns | + +## Common Patterns + +### Understanding tool usage + +``` +query_tool_frequency(days=30) +``` + +### Finding permission gaps + +``` +query_permission_gaps(threshold=5) # Commands used 5+ times that need permission +``` + +### Analyzing workflows + +``` +query_sequences(min_count=3, length=3) # Common 3-tool sequences +``` + +## Integration with /improve-workflow + +The `get_insights` tool returns pre-computed patterns specifically for +the `/improve-workflow` command: + +``` +get_insights(refresh=True) # Force fresh analysis +``` + +## Data Location + +- Database: `~/.claude/contrib/analytics/data.db` +- Logs parsed from: `~/.claude/projects/**/*.jsonl` diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py new file mode 100644 index 0000000..5ce5ea2 --- /dev/null +++ b/src/session_analytics/server.py @@ -0,0 +1,126 @@ +"""MCP Session Analytics Server. + +Provides tools for querying Claude Code session logs: +- ingest_logs: Refresh data from JSONL files +- query_timeline: Events in time window +- query_tool_frequency: Tool usage counts +- query_commands: Bash command breakdown +- query_sequences: Common tool patterns +- query_permission_gaps: Commands needing settings.json +- query_sessions: Session metadata +- query_tokens: Token usage analysis +- get_insights: Pre-computed patterns for /improve-workflow +- get_status: Ingestion status + DB stats +""" + +import logging +import os +from pathlib import Path + +from fastmcp import FastMCP + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger("session-analytics") +if os.environ.get("DEV_MODE"): + logger.setLevel(logging.DEBUG) + +# Initialize MCP server +mcp = FastMCP("session-analytics") + + +@mcp.resource("session-analytics://guide", description="Usage guide and best practices") +def usage_guide() -> str: + """Return the session analytics usage guide from external markdown file.""" + guide_path = Path(__file__).parent / "guide.md" + try: + return guide_path.read_text() + except FileNotFoundError: + return "# Session Analytics Usage Guide\n\nGuide file not found. See CLAUDE.md for usage." + + +@mcp.tool() +def get_status() -> dict: + """Get ingestion status and database stats. + + Returns: + Status info including last ingestion time, event count, and DB size + """ + # Placeholder - will be implemented in Phase 2 + return { + "status": "ok", + "version": "0.1.0", + "message": "Session analytics server is running. Storage layer not yet implemented.", + "db_path": str(Path.home() / ".claude" / "contrib" / "analytics" / "data.db"), + } + + +@mcp.tool() +def ingest_logs(days: int = 7, project: str | None = None, force: bool = False) -> dict: + """Refresh data from JSONL session log files. + + Args: + days: Number of days to look back (default: 7) + project: Optional project path filter + force: Force re-ingestion even if data is fresh + + Returns: + Ingestion stats (files processed, entries added, etc.) + """ + # Placeholder - will be implemented in Phase 3 + return { + "status": "not_implemented", + "message": "Ingestion will be implemented in Phase 3", + "days": days, + "project": project, + "force": force, + } + + +@mcp.tool() +def query_tool_frequency(days: int = 7, project: str | None = None) -> dict: + """Get tool usage frequency counts. + + Args: + days: Number of days to analyze (default: 7) + project: Optional project path filter + + Returns: + Tool frequency breakdown + """ + # Placeholder - will be implemented in Phase 4 + return { + "status": "not_implemented", + "message": "Query will be implemented in Phase 4", + "days": days, + "project": project, + } + + +def create_app(): + """Create the ASGI app for uvicorn.""" + # stateless_http=True allows resilience to server restarts + return mcp.http_app(stateless_http=True) + + +def main(): + """Run the MCP server.""" + import uvicorn + + port = int(os.environ.get("PORT", 8081)) + host = os.environ.get("HOST", "127.0.0.1") + + print(f"Starting Claude Session Analytics on {host}:{port}") + print( + f"Add to Claude Code: claude mcp add --transport http --scope user session-analytics http://{host}:{port}/mcp" + ) + + uvicorn.run(create_app(), host=host, port=port) + + +if __name__ == "__main__": + main() diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..b76b24c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for claude-session-analytics.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..83cca08 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,25 @@ +"""Pytest configuration and fixtures.""" + +import pytest + + +@pytest.fixture +def sample_session_log_entry(): + """Sample JSONL entry from a Claude Code session log.""" + return { + "uuid": "test-uuid-12345", + "timestamp": "2025-01-01T12:00:00.000Z", + "sessionId": "session-abc123", + "type": "assistant", + "message": { + "role": "assistant", + "content": [ + { + "type": "tool_use", + "id": "tool-123", + "name": "Bash", + "input": {"command": "git status", "description": "Check git status"}, + } + ], + }, + } diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..cc43083 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,27 @@ +"""Tests for the MCP server.""" + +from session_analytics.server import get_status, ingest_logs, query_tool_frequency + + +def test_get_status(): + """Test that get_status returns expected fields.""" + # FastMCP wraps functions - access the underlying fn + result = get_status.fn() + assert result["status"] == "ok" + assert "version" in result + assert "db_path" in result + + +def test_ingest_logs_placeholder(): + """Test that ingest_logs returns placeholder response.""" + result = ingest_logs.fn(days=7) + assert result["status"] == "not_implemented" + assert result["days"] == 7 + + +def test_query_tool_frequency_placeholder(): + """Test that query_tool_frequency returns placeholder response.""" + result = query_tool_frequency.fn(days=14, project="/some/path") + assert result["status"] == "not_implemented" + assert result["days"] == 14 + assert result["project"] == "/some/path" From feb498e3decdb58ab910a251109703aebd0289e9 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Wed, 31 Dec 2025 04:30:26 +0000 Subject: [PATCH 2/3] Add Phase 2: SQLite storage layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - storage.py with SQLiteStorage class: - Events table with denormalized fields for fast queries - Sessions table for session metadata - Ingestion state tracking for incremental updates - Patterns table for pre-computed insights - Indexes on timestamp, session_id, tool_name, project_path - Data classes: Event, Session, IngestionState, Pattern - CRUD operations for all entities with batch insert support - get_db_stats() for monitoring database health - Updated server.py to use storage for get_status() - Comprehensive test suite (16 tests) đŸ¤– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/session_analytics/server.py | 13 +- src/session_analytics/storage.py | 588 +++++++++++++++++++++++++++++++ tests/test_server.py | 2 + tests/test_storage.py | 267 ++++++++++++++ 4 files changed, 867 insertions(+), 3 deletions(-) create mode 100644 src/session_analytics/storage.py create mode 100644 tests/test_storage.py diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index 5ce5ea2..987dfeb 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -19,6 +19,8 @@ from fastmcp import FastMCP +from session_analytics.storage import SQLiteStorage + # Configure logging logging.basicConfig( level=logging.INFO, @@ -32,6 +34,9 @@ # Initialize MCP server mcp = FastMCP("session-analytics") +# Initialize storage +storage = SQLiteStorage() + @mcp.resource("session-analytics://guide", description="Usage guide and best practices") def usage_guide() -> str: @@ -50,12 +55,14 @@ def get_status() -> dict: Returns: Status info including last ingestion time, event count, and DB size """ - # Placeholder - will be implemented in Phase 2 + stats = storage.get_db_stats() + last_ingest = storage.get_last_ingestion_time() + return { "status": "ok", "version": "0.1.0", - "message": "Session analytics server is running. Storage layer not yet implemented.", - "db_path": str(Path.home() / ".claude" / "contrib" / "analytics" / "data.db"), + "last_ingestion": last_ingest.isoformat() if last_ingest else None, + **stats, } diff --git a/src/session_analytics/storage.py b/src/session_analytics/storage.py new file mode 100644 index 0000000..0fa58e3 --- /dev/null +++ b/src/session_analytics/storage.py @@ -0,0 +1,588 @@ +"""SQLite storage backend for session analytics.""" + +import json +import logging +import os +import sqlite3 +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger("session-analytics") + +# Register datetime adapters/converters (required for Python 3.12+) + + +def _adapt_datetime(dt: datetime) -> str: + """Convert datetime to ISO format string for SQLite storage.""" + return dt.isoformat() + + +def _convert_datetime(data: bytes) -> datetime: + """Convert ISO format string from SQLite to datetime.""" + return datetime.fromisoformat(data.decode()) + + +sqlite3.register_adapter(datetime, _adapt_datetime) +sqlite3.register_converter("TIMESTAMP", _convert_datetime) + + +@dataclass +class Event: + """A parsed event from a Claude Code session log.""" + + id: int | None + uuid: str + timestamp: datetime + session_id: str + project_path: str | None = None + entry_type: str | None = None # 'user', 'assistant', 'summary' + + # Tool-specific (null if not a tool call) + tool_name: str | None = None + tool_input_json: str | None = None + tool_id: str | None = None + is_error: bool = False + + # Denormalized for common filters + command: str | None = None # Bash: first word + command_args: str | None = None # Bash: remaining args + file_path: str | None = None # Read/Edit/Write target + skill_name: str | None = None # Skill invocation + + # Token tracking + input_tokens: int | None = None + output_tokens: int | None = None + cache_read_tokens: int | None = None + cache_creation_tokens: int | None = None + model: str | None = None + + # Context + git_branch: str | None = None + cwd: str | None = None + + +@dataclass +class Session: + """Metadata about a Claude Code session.""" + + id: str + project_path: str | None = None + first_seen: datetime | None = None + last_seen: datetime | None = None + entry_count: int = 0 + tool_use_count: int = 0 + total_input_tokens: int = 0 + total_output_tokens: int = 0 + primary_branch: str | None = None + slug: str | None = None + + +@dataclass +class IngestionState: + """Tracks the ingestion state of a JSONL file.""" + + file_path: str + file_size: int + last_modified: datetime + entries_processed: int + last_processed: datetime + + +@dataclass +class Pattern: + """A pre-computed pattern for fast querying.""" + + id: int | None + pattern_type: str # 'tool_frequency', 'sequence', 'permission_gap', etc. + pattern_key: str # e.g., "Bash" or "Read → Edit" + count: int = 0 + last_seen: datetime | None = None + metadata: dict = field(default_factory=dict) + computed_at: datetime | None = None + + +# Default database path +DEFAULT_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" + +# Schema version for migrations +SCHEMA_VERSION = 1 + + +class SQLiteStorage: + """SQLite-backed storage for session analytics.""" + + def __init__(self, db_path: str | Path | None = None): + """Initialize storage with optional custom DB path.""" + if db_path is None: + db_path = os.environ.get("SESSION_ANALYTICS_DB", str(DEFAULT_DB_PATH)) + + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + self._init_db() + + @contextmanager + def _connect(self): + """Context manager for database connections.""" + conn = sqlite3.connect( + self.db_path, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + ) + conn.row_factory = sqlite3.Row + try: + yield conn + conn.commit() + finally: + conn.close() + + def _init_db(self): + """Create tables if they don't exist.""" + with self._connect() as conn: + # Schema version tracking + conn.execute(""" + CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY + ) + """) + + # Core events table (denormalized for fast queries) + conn.execute(""" + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY, + uuid TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + session_id TEXT NOT NULL, + project_path TEXT, + entry_type TEXT, + + -- Tool-specific + tool_name TEXT, + tool_input_json TEXT, + tool_id TEXT, + is_error INTEGER DEFAULT 0, + + -- Denormalized for common filters + command TEXT, + command_args TEXT, + file_path TEXT, + skill_name TEXT, + + -- Token tracking + input_tokens INTEGER, + output_tokens INTEGER, + cache_read_tokens INTEGER, + cache_creation_tokens INTEGER, + model TEXT, + + -- Context + git_branch TEXT, + cwd TEXT, + + UNIQUE(session_id, uuid) + ) + """) + + # Indexes for common queries + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_tool ON events(tool_name)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_events_project ON events(project_path)") + + # Sessions metadata + conn.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + project_path TEXT, + first_seen TIMESTAMP, + last_seen TIMESTAMP, + entry_count INTEGER DEFAULT 0, + tool_use_count INTEGER DEFAULT 0, + total_input_tokens INTEGER DEFAULT 0, + total_output_tokens INTEGER DEFAULT 0, + primary_branch TEXT, + slug TEXT + ) + """) + + # Ingestion tracking (incremental updates) + conn.execute(""" + CREATE TABLE IF NOT EXISTS ingestion_state ( + file_path TEXT PRIMARY KEY, + file_size INTEGER, + last_modified TIMESTAMP, + entries_processed INTEGER, + last_processed TIMESTAMP + ) + """) + + # Pre-computed patterns + conn.execute(""" + CREATE TABLE IF NOT EXISTS patterns ( + id INTEGER PRIMARY KEY, + pattern_type TEXT NOT NULL, + pattern_key TEXT NOT NULL, + count INTEGER DEFAULT 0, + last_seen TIMESTAMP, + metadata_json TEXT, + computed_at TIMESTAMP, + UNIQUE(pattern_type, pattern_key) + ) + """) + + # Set schema version + conn.execute( + "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,) + ) + + # Event operations + + def add_event(self, event: Event) -> Event: + """Add a new event and return it with assigned ID.""" + with self._connect() as conn: + cursor = conn.execute( + """ + INSERT OR IGNORE INTO events ( + uuid, timestamp, session_id, project_path, entry_type, + tool_name, tool_input_json, tool_id, is_error, + command, command_args, file_path, skill_name, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, + git_branch, cwd + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + event.uuid, + event.timestamp, + event.session_id, + event.project_path, + event.entry_type, + event.tool_name, + event.tool_input_json, + event.tool_id, + 1 if event.is_error else 0, + event.command, + event.command_args, + event.file_path, + event.skill_name, + event.input_tokens, + event.output_tokens, + event.cache_read_tokens, + event.cache_creation_tokens, + event.model, + event.git_branch, + event.cwd, + ), + ) + event.id = cursor.lastrowid + return event + + def add_events_batch(self, events: list[Event]) -> int: + """Add multiple events in a single transaction. Returns count added.""" + with self._connect() as conn: + cursor = conn.executemany( + """ + INSERT OR IGNORE INTO events ( + uuid, timestamp, session_id, project_path, entry_type, + tool_name, tool_input_json, tool_id, is_error, + command, command_args, file_path, skill_name, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, + git_branch, cwd + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + ( + e.uuid, + e.timestamp, + e.session_id, + e.project_path, + e.entry_type, + e.tool_name, + e.tool_input_json, + e.tool_id, + 1 if e.is_error else 0, + e.command, + e.command_args, + e.file_path, + e.skill_name, + e.input_tokens, + e.output_tokens, + e.cache_read_tokens, + e.cache_creation_tokens, + e.model, + e.git_branch, + e.cwd, + ) + for e in events + ], + ) + return cursor.rowcount + + def get_event_count(self) -> int: + """Get total number of events.""" + with self._connect() as conn: + row = conn.execute("SELECT COUNT(*) as count FROM events").fetchone() + return row["count"] + + def get_events_in_range( + self, + start: datetime | None = None, + end: datetime | None = None, + tool_name: str | None = None, + project_path: str | None = None, + limit: int = 100, + ) -> list[Event]: + """Get events within a time range with optional filters.""" + with self._connect() as conn: + conditions = [] + params: list = [] + + if start: + conditions.append("timestamp >= ?") + params.append(start) + if end: + conditions.append("timestamp <= ?") + params.append(end) + if tool_name: + conditions.append("tool_name = ?") + params.append(tool_name) + if project_path: + conditions.append("project_path = ?") + params.append(project_path) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + params.append(limit) + + rows = conn.execute( + f""" + SELECT * FROM events + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT ? + """, + params, + ).fetchall() + + return [self._row_to_event(row) for row in rows] + + def _row_to_event(self, row: sqlite3.Row) -> Event: + """Convert a database row to an Event object.""" + return Event( + id=row["id"], + uuid=row["uuid"], + timestamp=row["timestamp"], + session_id=row["session_id"], + project_path=row["project_path"], + entry_type=row["entry_type"], + tool_name=row["tool_name"], + tool_input_json=row["tool_input_json"], + tool_id=row["tool_id"], + is_error=bool(row["is_error"]), + command=row["command"], + command_args=row["command_args"], + file_path=row["file_path"], + skill_name=row["skill_name"], + input_tokens=row["input_tokens"], + output_tokens=row["output_tokens"], + cache_read_tokens=row["cache_read_tokens"], + cache_creation_tokens=row["cache_creation_tokens"], + model=row["model"], + git_branch=row["git_branch"], + cwd=row["cwd"], + ) + + # Session operations + + def upsert_session(self, session: Session) -> None: + """Add or update a session.""" + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO sessions ( + id, project_path, first_seen, last_seen, + entry_count, tool_use_count, + total_input_tokens, total_output_tokens, + primary_branch, slug + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + session.id, + session.project_path, + session.first_seen, + session.last_seen, + session.entry_count, + session.tool_use_count, + session.total_input_tokens, + session.total_output_tokens, + session.primary_branch, + session.slug, + ), + ) + + def get_session(self, session_id: str) -> Session | None: + """Get a session by ID.""" + with self._connect() as conn: + row = conn.execute("SELECT * FROM sessions WHERE id = ?", (session_id,)).fetchone() + if row: + return self._row_to_session(row) + return None + + def get_session_count(self) -> int: + """Get total number of sessions.""" + with self._connect() as conn: + row = conn.execute("SELECT COUNT(*) as count FROM sessions").fetchone() + return row["count"] + + def _row_to_session(self, row: sqlite3.Row) -> Session: + """Convert a database row to a Session object.""" + return Session( + id=row["id"], + project_path=row["project_path"], + first_seen=row["first_seen"], + last_seen=row["last_seen"], + entry_count=row["entry_count"], + tool_use_count=row["tool_use_count"], + total_input_tokens=row["total_input_tokens"], + total_output_tokens=row["total_output_tokens"], + primary_branch=row["primary_branch"], + slug=row["slug"], + ) + + # Ingestion state operations + + def get_ingestion_state(self, file_path: str) -> IngestionState | None: + """Get ingestion state for a file.""" + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM ingestion_state WHERE file_path = ?", (file_path,) + ).fetchone() + if row: + return IngestionState( + file_path=row["file_path"], + file_size=row["file_size"], + last_modified=row["last_modified"], + entries_processed=row["entries_processed"], + last_processed=row["last_processed"], + ) + return None + + def update_ingestion_state(self, state: IngestionState) -> None: + """Update ingestion state for a file.""" + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO ingestion_state ( + file_path, file_size, last_modified, entries_processed, last_processed + ) VALUES (?, ?, ?, ?, ?) + """, + ( + state.file_path, + state.file_size, + state.last_modified, + state.entries_processed, + state.last_processed, + ), + ) + + def get_last_ingestion_time(self) -> datetime | None: + """Get the most recent ingestion time across all files.""" + with self._connect() as conn: + row = conn.execute("SELECT MAX(last_processed) as last FROM ingestion_state").fetchone() + if not row or not row["last"]: + return None + # Handle both datetime objects and ISO strings (SQLite aggregates return strings) + val = row["last"] + return datetime.fromisoformat(val) if isinstance(val, str) else val + + # Pattern operations + + def upsert_pattern(self, pattern: Pattern) -> None: + """Add or update a pattern.""" + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO patterns ( + pattern_type, pattern_key, count, last_seen, metadata_json, computed_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, + ( + pattern.pattern_type, + pattern.pattern_key, + pattern.count, + pattern.last_seen, + json.dumps(pattern.metadata) if pattern.metadata else None, + pattern.computed_at, + ), + ) + + def get_patterns(self, pattern_type: str | None = None) -> list[Pattern]: + """Get patterns, optionally filtered by type.""" + with self._connect() as conn: + if pattern_type: + rows = conn.execute( + "SELECT * FROM patterns WHERE pattern_type = ? ORDER BY count DESC", + (pattern_type,), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM patterns ORDER BY pattern_type, count DESC" + ).fetchall() + + return [ + Pattern( + id=row["id"], + pattern_type=row["pattern_type"], + pattern_key=row["pattern_key"], + count=row["count"], + last_seen=row["last_seen"], + metadata=json.loads(row["metadata_json"]) if row["metadata_json"] else {}, + computed_at=row["computed_at"], + ) + for row in rows + ] + + def clear_patterns(self, pattern_type: str | None = None) -> int: + """Clear patterns, optionally filtered by type. Returns count deleted.""" + with self._connect() as conn: + if pattern_type: + cursor = conn.execute( + "DELETE FROM patterns WHERE pattern_type = ?", (pattern_type,) + ) + else: + cursor = conn.execute("DELETE FROM patterns") + return cursor.rowcount + + # Utility operations + + def get_db_stats(self) -> dict: + """Get database statistics.""" + with self._connect() as conn: + event_count = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] + session_count = conn.execute("SELECT COUNT(*) FROM sessions").fetchone()[0] + pattern_count = conn.execute("SELECT COUNT(*) FROM patterns").fetchone()[0] + file_count = conn.execute("SELECT COUNT(*) FROM ingestion_state").fetchone()[0] + + # Get date range + date_range = conn.execute( + "SELECT MIN(timestamp) as min_ts, MAX(timestamp) as max_ts FROM events" + ).fetchone() + + # Get DB file size + db_size = self.db_path.stat().st_size if self.db_path.exists() else 0 + + # Helper to convert datetime or string to ISO string + def to_iso(val): + if val is None: + return None + return val if isinstance(val, str) else val.isoformat() + + return { + "event_count": event_count, + "session_count": session_count, + "pattern_count": pattern_count, + "files_processed": file_count, + "earliest_event": to_iso(date_range["min_ts"]), + "latest_event": to_iso(date_range["max_ts"]), + "db_size_bytes": db_size, + "db_path": str(self.db_path), + } diff --git a/tests/test_server.py b/tests/test_server.py index cc43083..d05e030 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -10,6 +10,8 @@ def test_get_status(): assert result["status"] == "ok" assert "version" in result assert "db_path" in result + assert "event_count" in result + assert "session_count" in result def test_ingest_logs_placeholder(): diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..9c8519f --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,267 @@ +"""Tests for the SQLite storage layer.""" + +import tempfile +from datetime import datetime +from pathlib import Path + +import pytest + +from session_analytics.storage import ( + Event, + IngestionState, + Pattern, + Session, + SQLiteStorage, +) + + +@pytest.fixture +def storage(): + """Create a temporary storage instance for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + yield SQLiteStorage(db_path) + + +@pytest.fixture +def sample_event(): + """Create a sample event for testing.""" + return Event( + id=None, + uuid="test-uuid-12345", + timestamp=datetime(2025, 1, 1, 12, 0, 0), + session_id="session-abc123", + project_path="/encoded/project/path", + entry_type="assistant", + tool_name="Bash", + tool_input_json='{"command": "git status"}', + tool_id="tool-123", + is_error=False, + command="git", + command_args="status", + ) + + +class TestEventOperations: + """Tests for event CRUD operations.""" + + def test_add_event(self, storage, sample_event): + """Test adding a single event.""" + result = storage.add_event(sample_event) + assert result.id is not None + assert result.uuid == sample_event.uuid + + def test_add_event_dedup(self, storage, sample_event): + """Test that duplicate events are ignored.""" + storage.add_event(sample_event) + storage.add_event(sample_event) # Same uuid + session_id + assert storage.get_event_count() == 1 + + def test_add_events_batch(self, storage): + """Test adding multiple events in batch.""" + events = [ + Event( + id=None, + uuid=f"uuid-{i}", + timestamp=datetime(2025, 1, 1, 12, i, 0), + session_id="session-1", + ) + for i in range(5) + ] + count = storage.add_events_batch(events) + assert count == 5 + assert storage.get_event_count() == 5 + + def test_get_events_in_range(self, storage): + """Test filtering events by time range.""" + # Add events across different times + for i in range(5): + storage.add_event( + Event( + id=None, + uuid=f"uuid-{i}", + timestamp=datetime(2025, 1, i + 1, 12, 0, 0), + session_id="session-1", + ) + ) + + # Query a subset (start/end are inclusive, events are at 12:00) + events = storage.get_events_in_range( + start=datetime(2025, 1, 2, 0, 0, 0), + end=datetime(2025, 1, 4, 23, 59, 59), + ) + assert len(events) == 3 + + def test_get_events_by_tool(self, storage): + """Test filtering events by tool name.""" + storage.add_event( + Event( + id=None, + uuid="uuid-1", + timestamp=datetime.now(), + session_id="s1", + tool_name="Bash", + ) + ) + storage.add_event( + Event( + id=None, + uuid="uuid-2", + timestamp=datetime.now(), + session_id="s1", + tool_name="Read", + ) + ) + + bash_events = storage.get_events_in_range(tool_name="Bash") + assert len(bash_events) == 1 + assert bash_events[0].tool_name == "Bash" + + +class TestSessionOperations: + """Tests for session CRUD operations.""" + + def test_upsert_session(self, storage): + """Test adding and updating a session.""" + session = Session( + id="session-1", + project_path="/test/project", + first_seen=datetime(2025, 1, 1), + last_seen=datetime(2025, 1, 1), + entry_count=10, + ) + storage.upsert_session(session) + + retrieved = storage.get_session("session-1") + assert retrieved is not None + assert retrieved.entry_count == 10 + + # Update + session.entry_count = 20 + storage.upsert_session(session) + + retrieved = storage.get_session("session-1") + assert retrieved.entry_count == 20 + + def test_get_session_count(self, storage): + """Test counting sessions.""" + for i in range(3): + storage.upsert_session(Session(id=f"session-{i}")) + assert storage.get_session_count() == 3 + + +class TestIngestionState: + """Tests for ingestion state tracking.""" + + def test_update_and_get_ingestion_state(self, storage): + """Test tracking file ingestion state.""" + state = IngestionState( + file_path="/path/to/file.jsonl", + file_size=1024, + last_modified=datetime(2025, 1, 1), + entries_processed=100, + last_processed=datetime(2025, 1, 1, 12, 0), + ) + storage.update_ingestion_state(state) + + retrieved = storage.get_ingestion_state("/path/to/file.jsonl") + assert retrieved is not None + assert retrieved.file_size == 1024 + assert retrieved.entries_processed == 100 + + def test_get_last_ingestion_time(self, storage): + """Test getting most recent ingestion time.""" + storage.update_ingestion_state( + IngestionState( + file_path="/file1.jsonl", + file_size=100, + last_modified=datetime(2025, 1, 1), + entries_processed=10, + last_processed=datetime(2025, 1, 1, 10, 0), + ) + ) + storage.update_ingestion_state( + IngestionState( + file_path="/file2.jsonl", + file_size=200, + last_modified=datetime(2025, 1, 2), + entries_processed=20, + last_processed=datetime(2025, 1, 2, 10, 0), # More recent + ) + ) + + last_time = storage.get_last_ingestion_time() + assert last_time == datetime(2025, 1, 2, 10, 0) + + +class TestPatternOperations: + """Tests for pattern CRUD operations.""" + + def test_upsert_pattern(self, storage): + """Test adding and updating patterns.""" + pattern = Pattern( + id=None, + pattern_type="tool_frequency", + pattern_key="Bash", + count=100, + last_seen=datetime(2025, 1, 1), + metadata={"avg_duration": 1.5}, + ) + storage.upsert_pattern(pattern) + + patterns = storage.get_patterns("tool_frequency") + assert len(patterns) == 1 + assert patterns[0].count == 100 + assert patterns[0].metadata["avg_duration"] == 1.5 + + def test_get_patterns_by_type(self, storage): + """Test filtering patterns by type.""" + storage.upsert_pattern( + Pattern(id=None, pattern_type="tool_frequency", pattern_key="Bash", count=50) + ) + storage.upsert_pattern( + Pattern(id=None, pattern_type="sequence", pattern_key="Read→Edit", count=30) + ) + + tool_patterns = storage.get_patterns("tool_frequency") + assert len(tool_patterns) == 1 + + all_patterns = storage.get_patterns() + assert len(all_patterns) == 2 + + def test_clear_patterns(self, storage): + """Test clearing patterns.""" + storage.upsert_pattern( + Pattern(id=None, pattern_type="tool_frequency", pattern_key="Bash", count=50) + ) + storage.upsert_pattern( + Pattern(id=None, pattern_type="sequence", pattern_key="Read→Edit", count=30) + ) + + # Clear just one type + deleted = storage.clear_patterns("tool_frequency") + assert deleted == 1 + assert len(storage.get_patterns()) == 1 + + # Clear all + storage.upsert_pattern( + Pattern(id=None, pattern_type="tool_frequency", pattern_key="Read", count=40) + ) + deleted = storage.clear_patterns() + assert deleted == 2 + + +class TestDbStats: + """Tests for database statistics.""" + + def test_get_db_stats(self, storage, sample_event): + """Test getting database statistics.""" + storage.add_event(sample_event) + storage.upsert_session(Session(id="session-1")) + storage.upsert_pattern(Pattern(id=None, pattern_type="test", pattern_key="key", count=1)) + + stats = storage.get_db_stats() + assert stats["event_count"] == 1 + assert stats["session_count"] == 1 + assert stats["pattern_count"] == 1 + assert stats["db_path"] is not None From 39e935eb45a9dab21234084ebb7e0101afde5c6c Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Wed, 31 Dec 2025 04:37:00 +0000 Subject: [PATCH 3/3] Add Phase 3: JSONL ingestion module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements log file discovery and parsing: - find_log_files(): Discovers JSONL files within date range - parse_tool_use(): Extracts tool info (command, file_path, skill_name) - parse_entry(): Parses entries into Event objects - ingest_file(): Incremental ingestion with mtime/size tracking - ingest_logs(): Full ingestion orchestration - update_session_stats(): Aggregates session statistics Integrates with server.py to provide real data for ingest_logs tool. Closes #3 đŸ¤– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/session_analytics/ingest.py | 452 ++++++++++++++++++++++++++++++++ src/session_analytics/server.py | 10 +- tests/test_ingest.py | 316 ++++++++++++++++++++++ tests/test_server.py | 11 +- 4 files changed, 778 insertions(+), 11 deletions(-) create mode 100644 src/session_analytics/ingest.py create mode 100644 tests/test_ingest.py diff --git a/src/session_analytics/ingest.py b/src/session_analytics/ingest.py new file mode 100644 index 0000000..6ab5ad8 --- /dev/null +++ b/src/session_analytics/ingest.py @@ -0,0 +1,452 @@ +"""JSONL log ingestion for Claude Code session analytics.""" + +import json +import logging +from datetime import datetime, timedelta +from pathlib import Path + +from session_analytics.storage import Event, IngestionState, Session, SQLiteStorage + +logger = logging.getLogger("session-analytics") + +# Default location for Claude Code session logs +DEFAULT_LOGS_DIR = Path.home() / ".claude" / "projects" + + +def find_log_files( + logs_dir: Path = DEFAULT_LOGS_DIR, + days: int = 7, + project_filter: str | None = None, +) -> list[Path]: + """Find JSONL log files within the specified time range. + + Args: + logs_dir: Directory containing project subdirectories + days: Only include files modified within this many days + project_filter: Optional project path to filter (encoded form) + + Returns: + List of JSONL file paths, sorted by modification time (newest first) + """ + if not logs_dir.exists(): + logger.warning(f"Logs directory does not exist: {logs_dir}") + return [] + + cutoff = datetime.now() - timedelta(days=days) + files = [] + + for project_dir in logs_dir.iterdir(): + if not project_dir.is_dir(): + continue + + # Apply project filter if specified + if project_filter and project_filter not in project_dir.name: + continue + + for jsonl_file in project_dir.glob("*.jsonl"): + try: + mtime = datetime.fromtimestamp(jsonl_file.stat().st_mtime) + if mtime >= cutoff: + files.append((jsonl_file, mtime)) + except OSError as e: + logger.warning(f"Could not stat {jsonl_file}: {e}") + + # Sort by modification time, newest first + files.sort(key=lambda x: x[1], reverse=True) + return [f for f, _ in files] + + +def parse_tool_use(tool_use: dict) -> dict: + """Extract normalized fields from a tool_use block. + + Returns dict with: tool_name, tool_id, tool_input_json, command, command_args, + file_path, skill_name + """ + result = { + "tool_name": tool_use.get("name"), + "tool_id": tool_use.get("id"), + "tool_input_json": json.dumps(tool_use.get("input", {})), + "command": None, + "command_args": None, + "file_path": None, + "skill_name": None, + } + + tool_input = tool_use.get("input", {}) + tool_name = result["tool_name"] + + # Extract Bash command info + if tool_name == "Bash": + cmd = tool_input.get("command", "") + if cmd: + parts = cmd.split(None, 1) + result["command"] = parts[0] if parts else None + result["command_args"] = parts[1] if len(parts) > 1 else None + + # Extract file path for file operations + elif tool_name in ("Read", "Edit", "Write", "Glob", "Grep"): + result["file_path"] = tool_input.get("file_path") or tool_input.get("path") + + # Extract skill name + elif tool_name == "Skill": + result["skill_name"] = tool_input.get("skill") + + # Handle MCP tools (e.g., mcp__event-bus__register_session) + elif tool_name and tool_name.startswith("mcp__"): + # Keep the full name for MCP tools + pass + + return result + + +def parse_entry(raw: dict, project_path: str) -> list[Event]: + """Parse a single JSONL entry into Event objects. + + An entry may produce multiple events (e.g., assistant with multiple tool_use blocks). + + Args: + raw: Parsed JSON object from JSONL + project_path: Encoded project path from directory name + + Returns: + List of Event objects (may be empty for skipped entries) + """ + entry_type = raw.get("type") + + # Skip certain entry types that don't contain useful analytics data + if entry_type in ("file-history-snapshot", "queue-operation", "create"): + return [] + + # Skip thinking/text blocks that are nested content + if entry_type in ("thinking", "text", "tool_use", "tool_result", "message"): + return [] + + uuid = raw.get("uuid") + session_id = raw.get("sessionId") + timestamp_str = raw.get("timestamp") + + # Skip entries without required fields + if not uuid or not session_id or not timestamp_str: + return [] + + try: + timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + # Convert to naive datetime (remove timezone for SQLite compatibility) + timestamp = timestamp.replace(tzinfo=None) + except (ValueError, AttributeError): + logger.debug(f"Could not parse timestamp: {timestamp_str}") + return [] + + # Extract common fields + cwd = raw.get("cwd") + git_branch = raw.get("gitBranch") + + # Extract token usage from assistant messages + message = raw.get("message", {}) + usage = message.get("usage", {}) + input_tokens = usage.get("input_tokens") + output_tokens = usage.get("output_tokens") + cache_read_tokens = usage.get("cache_read_input_tokens") + cache_creation_tokens = usage.get("cache_creation_input_tokens") + model = message.get("model") + + events = [] + + # Handle assistant entries with tool_use blocks + if entry_type == "assistant": + content = message.get("content", []) + tool_uses = [c for c in content if isinstance(c, dict) and c.get("type") == "tool_use"] + + if tool_uses: + # Create an event for each tool_use + for tool_use in tool_uses: + parsed = parse_tool_use(tool_use) + events.append( + Event( + id=None, + uuid=f"{uuid}:{parsed['tool_id']}", # Unique per tool_use + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="tool_use", + tool_name=parsed["tool_name"], + tool_input_json=parsed["tool_input_json"], + tool_id=parsed["tool_id"], + is_error=False, + command=parsed["command"], + command_args=parsed["command_args"], + file_path=parsed["file_path"], + skill_name=parsed["skill_name"], + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read_tokens, + cache_creation_tokens=cache_creation_tokens, + model=model, + git_branch=git_branch, + cwd=cwd, + ) + ) + else: + # Assistant message without tools + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="assistant", + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read_tokens, + cache_creation_tokens=cache_creation_tokens, + model=model, + git_branch=git_branch, + cwd=cwd, + ) + ) + + # Handle user entries (may contain tool_result) + elif entry_type == "user": + content = message.get("content", "") + + # Check if content is a list with tool_result blocks + if isinstance(content, list): + tool_results = [ + c for c in content if isinstance(c, dict) and c.get("type") == "tool_result" + ] + if tool_results: + for tr in tool_results: + # Check for error + is_error = tr.get("is_error", False) + events.append( + Event( + id=None, + uuid=f"{uuid}:{tr.get('tool_use_id', 'result')}", + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="tool_result", + tool_id=tr.get("tool_use_id"), + is_error=is_error, + git_branch=git_branch, + cwd=cwd, + ) + ) + else: + # User message with other content types + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="user", + git_branch=git_branch, + cwd=cwd, + ) + ) + else: + # Plain text user message + events.append( + Event( + id=None, + uuid=uuid, + timestamp=timestamp, + session_id=session_id, + project_path=project_path, + entry_type="user", + git_branch=git_branch, + cwd=cwd, + ) + ) + + # Handle summary entries + elif entry_type == "summary": + events.append( + Event( + id=None, + uuid=uuid if uuid else f"summary:{raw.get('leafUuid', 'unknown')}", + timestamp=timestamp if timestamp else datetime.now(), + session_id=session_id if session_id else "unknown", + project_path=project_path, + entry_type="summary", + ) + ) + + return events + + +def ingest_file( + file_path: Path, + storage: SQLiteStorage, + force: bool = False, +) -> dict: + """Ingest a single JSONL file. + + Uses incremental ingestion - only processes new entries if file has changed. + + Args: + file_path: Path to JSONL file + storage: Storage instance + force: Force re-ingestion even if file hasn't changed + + Returns: + Stats dict with entries_processed, events_added, skipped + """ + file_str = str(file_path) + stat = file_path.stat() + file_size = stat.st_size + file_mtime = datetime.fromtimestamp(stat.st_mtime) + + # Check if we've already processed this file + state = storage.get_ingestion_state(file_str) + if state and not force: + # Skip if file hasn't changed + if state.file_size == file_size and state.last_modified >= file_mtime: + return {"entries_processed": 0, "events_added": 0, "skipped": True} + + # Extract project path from directory name + project_path = file_path.parent.name + + # Parse and collect events + events = [] + entries_processed = 0 + errors = 0 + + with open(file_path, encoding="utf-8") as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + + try: + raw = json.loads(line) + parsed_events = parse_entry(raw, project_path) + events.extend(parsed_events) + entries_processed += 1 + except json.JSONDecodeError as e: + logger.debug(f"JSON parse error in {file_path}:{line_num}: {e}") + errors += 1 + except Exception as e: + logger.warning(f"Error processing {file_path}:{line_num}: {e}") + errors += 1 + + # Batch insert events + events_added = storage.add_events_batch(events) if events else 0 + + # Update ingestion state + storage.update_ingestion_state( + IngestionState( + file_path=file_str, + file_size=file_size, + last_modified=file_mtime, + entries_processed=entries_processed, + last_processed=datetime.now(), + ) + ) + + return { + "entries_processed": entries_processed, + "events_added": events_added, + "skipped": False, + "errors": errors, + } + + +def update_session_stats(storage: SQLiteStorage) -> int: + """Update session statistics from ingested events. + + Returns number of sessions updated. + """ + # Query distinct sessions from events + with storage._connect() as conn: + rows = conn.execute(""" + SELECT + session_id, + project_path, + MIN(timestamp) as first_seen, + MAX(timestamp) as last_seen, + COUNT(*) as entry_count, + SUM(CASE WHEN tool_name IS NOT NULL THEN 1 ELSE 0 END) as tool_use_count, + SUM(COALESCE(input_tokens, 0)) as total_input_tokens, + SUM(COALESCE(output_tokens, 0)) as total_output_tokens, + (SELECT git_branch FROM events e2 + WHERE e2.session_id = events.session_id + ORDER BY timestamp DESC LIMIT 1) as primary_branch + FROM events + GROUP BY session_id + """).fetchall() + + count = 0 + for row in rows: + storage.upsert_session( + Session( + id=row["session_id"], + project_path=row["project_path"], + first_seen=row["first_seen"], + last_seen=row["last_seen"], + entry_count=row["entry_count"], + tool_use_count=row["tool_use_count"], + total_input_tokens=row["total_input_tokens"], + total_output_tokens=row["total_output_tokens"], + primary_branch=row["primary_branch"], + ) + ) + count += 1 + + return count + + +def ingest_logs( + storage: SQLiteStorage, + days: int = 7, + project: str | None = None, + force: bool = False, +) -> dict: + """Ingest all JSONL log files. + + Args: + storage: Storage instance + days: Number of days to look back + project: Optional project filter + force: Force re-ingestion + + Returns: + Stats dict with totals + """ + files = find_log_files(days=days, project_filter=project) + + total_entries = 0 + total_events = 0 + files_processed = 0 + files_skipped = 0 + total_errors = 0 + + for file_path in files: + try: + result = ingest_file(file_path, storage, force=force) + if result["skipped"]: + files_skipped += 1 + else: + files_processed += 1 + total_entries += result["entries_processed"] + total_events += result["events_added"] + total_errors += result.get("errors", 0) + except Exception as e: + logger.error(f"Failed to ingest {file_path}: {e}") + total_errors += 1 + + # Update session statistics + sessions_updated = update_session_stats(storage) + + return { + "files_found": len(files), + "files_processed": files_processed, + "files_skipped": files_skipped, + "entries_processed": total_entries, + "events_added": total_events, + "sessions_updated": sessions_updated, + "errors": total_errors, + } diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index 987dfeb..5abd6d9 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -19,6 +19,7 @@ from fastmcp import FastMCP +from session_analytics.ingest import ingest_logs as do_ingest_logs from session_analytics.storage import SQLiteStorage # Configure logging @@ -78,13 +79,10 @@ def ingest_logs(days: int = 7, project: str | None = None, force: bool = False) Returns: Ingestion stats (files processed, entries added, etc.) """ - # Placeholder - will be implemented in Phase 3 + result = do_ingest_logs(storage, days=days, project=project, force=force) return { - "status": "not_implemented", - "message": "Ingestion will be implemented in Phase 3", - "days": days, - "project": project, - "force": force, + "status": "ok", + **result, } diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..b2503b5 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,316 @@ +"""Tests for the JSONL ingestion module.""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from session_analytics.ingest import ( + find_log_files, + ingest_file, + parse_entry, + parse_tool_use, +) +from session_analytics.storage import SQLiteStorage + + +@pytest.fixture +def storage(): + """Create a temporary storage instance for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + yield SQLiteStorage(db_path) + + +@pytest.fixture +def sample_logs_dir(): + """Create a temporary directory with sample JSONL files.""" + with tempfile.TemporaryDirectory() as tmpdir: + logs_dir = Path(tmpdir) + project_dir = logs_dir / "-test-project" + project_dir.mkdir() + + # Create a sample JSONL file + jsonl_file = project_dir / "test-session.jsonl" + entries = [ + { + "type": "user", + "uuid": "user-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "cwd": "/test/project", + "gitBranch": "main", + "message": {"role": "user", "content": "Hello"}, + }, + { + "type": "assistant", + "uuid": "assistant-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:05.000Z", + "cwd": "/test/project", + "gitBranch": "main", + "message": { + "role": "assistant", + "model": "claude-opus-4-5-20251101", + "content": [ + { + "type": "tool_use", + "id": "tool-1", + "name": "Bash", + "input": {"command": "git status"}, + } + ], + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 1000, + }, + }, + }, + { + "type": "user", + "uuid": "result-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:10.000Z", + "cwd": "/test/project", + "gitBranch": "main", + "message": { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool-1", + "content": "On branch main", + } + ], + }, + }, + ] + + with open(jsonl_file, "w") as f: + for entry in entries: + f.write(json.dumps(entry) + "\n") + + yield logs_dir + + +class TestParseToolUse: + """Tests for tool_use parsing.""" + + def test_parse_bash_command(self): + """Test extracting command from Bash tool.""" + tool_use = { + "name": "Bash", + "id": "tool-1", + "input": {"command": "git status --short"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "Bash" + assert result["command"] == "git" + assert result["command_args"] == "status --short" + + def test_parse_read_file(self): + """Test extracting file_path from Read tool.""" + tool_use = { + "name": "Read", + "id": "tool-2", + "input": {"file_path": "/path/to/file.py"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "Read" + assert result["file_path"] == "/path/to/file.py" + + def test_parse_skill(self): + """Test extracting skill_name from Skill tool.""" + tool_use = { + "name": "Skill", + "id": "tool-3", + "input": {"skill": "commit"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "Skill" + assert result["skill_name"] == "commit" + + def test_parse_mcp_tool(self): + """Test parsing MCP tool names.""" + tool_use = { + "name": "mcp__event-bus__register_session", + "id": "tool-4", + "input": {"name": "test"}, + } + result = parse_tool_use(tool_use) + assert result["tool_name"] == "mcp__event-bus__register_session" + + +class TestParseEntry: + """Tests for entry parsing.""" + + def test_parse_user_message(self): + """Test parsing a user message.""" + entry = { + "type": "user", + "uuid": "user-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "cwd": "/test", + "gitBranch": "main", + "message": {"role": "user", "content": "Hello"}, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "user" + assert events[0].session_id == "session-1" + + def test_parse_assistant_with_tool(self): + """Test parsing an assistant message with tool_use.""" + entry = { + "type": "assistant", + "uuid": "assistant-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "model": "claude-opus-4-5", + "content": [ + { + "type": "tool_use", + "id": "tool-1", + "name": "Bash", + "input": {"command": "ls -la"}, + } + ], + "usage": {"input_tokens": 100, "output_tokens": 50}, + }, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "tool_use" + assert events[0].tool_name == "Bash" + assert events[0].command == "ls" + assert events[0].input_tokens == 100 + + def test_parse_tool_result(self): + """Test parsing a tool_result entry.""" + entry = { + "type": "user", + "uuid": "result-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + "message": { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool-1", + "content": "output", + } + ], + }, + } + events = parse_entry(entry, "test-project") + assert len(events) == 1 + assert events[0].entry_type == "tool_result" + assert events[0].tool_id == "tool-1" + + def test_skip_file_history_snapshot(self): + """Test that file-history-snapshot entries are skipped.""" + entry = { + "type": "file-history-snapshot", + "uuid": "snapshot-1", + "sessionId": "session-1", + "timestamp": "2025-01-01T12:00:00.000Z", + } + events = parse_entry(entry, "test-project") + assert len(events) == 0 + + def test_skip_malformed_entry(self): + """Test that entries without required fields are skipped.""" + entry = {"type": "user"} # Missing uuid, sessionId, timestamp + events = parse_entry(entry, "test-project") + assert len(events) == 0 + + +class TestIngestFile: + """Tests for file ingestion.""" + + def test_ingest_file(self, storage, sample_logs_dir): + """Test ingesting a JSONL file.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + result = ingest_file(jsonl_file, storage) + assert result["entries_processed"] == 3 + assert result["events_added"] == 3 + assert result["skipped"] is False + + def test_incremental_ingestion(self, storage, sample_logs_dir): + """Test that unchanged files are skipped on re-ingestion.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + # First ingestion + result1 = ingest_file(jsonl_file, storage) + assert result1["skipped"] is False + + # Second ingestion should skip + result2 = ingest_file(jsonl_file, storage) + assert result2["skipped"] is True + + def test_force_reingestion(self, storage, sample_logs_dir): + """Test force re-ingestion.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + # First ingestion + ingest_file(jsonl_file, storage) + + # Force re-ingestion should process again + result = ingest_file(jsonl_file, storage, force=True) + assert result["skipped"] is False + + +class TestFindLogFiles: + """Tests for log file discovery.""" + + def test_find_log_files(self, sample_logs_dir): + """Test finding JSONL files in logs directory.""" + files = find_log_files(logs_dir=sample_logs_dir, days=7) + assert len(files) == 1 + assert files[0].suffix == ".jsonl" + + def test_filter_by_project(self, sample_logs_dir): + """Test filtering by project name.""" + # Create another project + other_project = sample_logs_dir / "-other-project" + other_project.mkdir() + (other_project / "other.jsonl").write_text('{"type":"user"}\n') + + # Should find both + all_files = find_log_files(logs_dir=sample_logs_dir, days=7) + assert len(all_files) == 2 + + # Should only find matching project + filtered = find_log_files(logs_dir=sample_logs_dir, days=7, project_filter="test") + assert len(filtered) == 1 + assert "test" in str(filtered[0]) + + +class TestIngestLogs: + """Tests for full ingestion flow.""" + + def test_ingest_logs(self, storage, sample_logs_dir): + """Test full ingestion flow.""" + # Use find_log_files with explicit logs_dir + from session_analytics.ingest import ingest_file as do_ingest_file + from session_analytics.ingest import update_session_stats + + files = find_log_files(logs_dir=sample_logs_dir, days=7) + assert len(files) == 1 + + # Ingest the file + result = do_ingest_file(files[0], storage) + assert result["events_added"] == 3 + + # Update session stats + sessions = update_session_stats(storage) + assert sessions >= 1 diff --git a/tests/test_server.py b/tests/test_server.py index d05e030..d85b404 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -14,11 +14,12 @@ def test_get_status(): assert "session_count" in result -def test_ingest_logs_placeholder(): - """Test that ingest_logs returns placeholder response.""" - result = ingest_logs.fn(days=7) - assert result["status"] == "not_implemented" - assert result["days"] == 7 +def test_ingest_logs(): + """Test that ingest_logs runs and returns stats.""" + result = ingest_logs.fn(days=1) + assert result["status"] == "ok" + assert "files_found" in result + assert "events_added" in result def test_query_tool_frequency_placeholder():