diff --git a/README.md b/README.md index 6854e4e..654158e 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Parses your Claude Code session logs (`~/.claude/projects/**/*.jsonl`) and provi - **Permission gaps** - Commands that should be added to settings.json - **Token usage** - Usage breakdown by day, session, or model - **Session timeline** - Events across conversations with filtering +- **Cross-session insights** - Gotchas, patterns, and learnings from [event-bus](https://github.com/evansenter/claude-event-bus) Data is stored persistently in SQLite and auto-refreshes when stale (>5 min old). @@ -76,6 +77,9 @@ session-analytics-cli git-ingest # Import git commit history session-analytics-cli git-correlate # Link commits to sessions session-analytics-cli session-commits # Show commits per session +# Event-Bus Integration +session-analytics-cli bus-events # Query cross-session events (gotchas, patterns) + # Pattern Inspection session-analytics-cli sample-sequences # Sample instances of a pattern with context ``` @@ -87,7 +91,7 @@ All commands support: ## MCP Tools -28 tools available when running as an MCP server: +30 tools available when running as an MCP server: | Category | Tools | |----------|-------| @@ -100,6 +104,7 @@ All commands support: | **Messages** | `get_session_messages`, `search_messages` | | **Relationships** | `detect_parallel_sessions`, `find_related_sessions` | | **Git** | `ingest_git_history`, `correlate_git_with_sessions`, `get_session_commits` | +| **Event-Bus** | `ingest_bus_events`, `get_bus_events` | For detailed usage, read the MCP resource `session-analytics://guide` or see [guide.md](src/session_analytics/guide.md). @@ -123,6 +128,7 @@ make check - **Database**: `~/.claude/contrib/analytics/data.db` - **Logs parsed from**: `~/.claude/projects/**/*.jsonl` +- **Event-bus source**: `~/.claude/contrib/event-bus/data.db` (if [claude-event-bus](https://github.com/evansenter/claude-event-bus) is installed) ## How It Works diff --git a/docs/event-bus-integration.md b/docs/event-bus-integration.md new file mode 100644 index 0000000..d05058e --- /dev/null +++ b/docs/event-bus-integration.md @@ -0,0 +1,216 @@ +# Event-Bus Integration + +Session Analytics can ingest events from [claude-event-bus](https://github.com/evansenter/claude-event-bus) for queryable cross-session insights. + +## Overview + +The event-bus enables Claude Code sessions to communicate asynchronously. By ingesting these events, you can: + +- Track gotchas and patterns discovered across sessions +- See help requests and responses between sessions +- Correlate task progress across parallel work +- Surface cross-session learnings in `/improve-workflow` + +## Prerequisites + +Install [claude-event-bus](https://github.com/evansenter/claude-event-bus). The integration is optional—session-analytics works without it. + +## Data Flow + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Event-Bus Database │ +│ ~/.claude/contrib/event-bus/data.db │ +│ ┌─────────────────────────────────────────────────────────────┐│ +│ │ events table: id, event_type, channel, session_id, payload ││ +│ └─────────────────────────────────────────────────────────────┘│ +└───────────────────────────┬─────────────────────────────────────┘ + │ Read-only connection + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ ingest_bus_events() │ +│ - Incremental: tracks last ingested event_id │ +│ - Extracts repo from channel (e.g., "repo:dotfiles" → "dotfiles") +│ - Stores raw payload unchanged │ +└───────────────────────────┬─────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Session Analytics Database │ +│ ~/.claude/contrib/analytics/data.db │ +│ ┌─────────────────────────────────────────────────────────────┐│ +│ │ bus_events table: event_id, timestamp, event_type, channel,││ +│ │ session_id, repo, payload ││ +│ └─────────────────────────────────────────────────────────────┘│ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Event Types + +Common event types from event-bus: + +| Event Type | Description | +|------------|-------------| +| `gotcha_discovered` | Non-obvious issues found during work | +| `pattern_found` | Useful patterns identified | +| `help_needed` | Request for assistance from other sessions | +| `help_response` | Response to a help request | +| `task_started` | Work begun on an issue/PR | +| `task_completed` | Work finished (merged, closed) | +| `ci_completed` | CI run finished (pass/fail) | +| `wip_checkpoint` | Work-in-progress snapshot | + +## API Reference + +### CLI + +```bash +# Ingest and query events +session-analytics-cli bus-events --days 7 + +# Filter by event type +session-analytics-cli bus-events --event-type gotcha_discovered + +# Filter by repository +session-analytics-cli bus-events --repo dotfiles + +# Limit results +session-analytics-cli bus-events --limit 20 + +# JSON output +session-analytics-cli bus-events --json +``` + +### MCP Tools + +#### `ingest_bus_events(days?)` + +Ingest events from event-bus database. Performs incremental ingestion by tracking the last ingested event ID. + +**Parameters:** +- `days` (int, default: 7): Days to look back on first run + +**Returns:** +```json +{ + "status": "ok", + "events_ingested": 42, + "last_event_id": 1438, + "oldest_event": "2025-01-01T10:00:00", + "newest_event": "2025-01-08T01:30:00" +} +``` + +If event-bus is not installed: +```json +{ + "status": "skipped", + "reason": "event-bus database not found", + "path": "~/.claude/contrib/event-bus/data.db" +} +``` + +#### `get_bus_events(days?, event_type?, session_id?, repo?, limit?)` + +Query ingested event-bus events. + +**Parameters:** +- `days` (int, default: 7): Time range to query +- `event_type` (str, optional): Filter by event type +- `session_id` (str, optional): Filter by source session +- `repo` (str, optional): Filter by repository name +- `limit` (int, default: 100): Maximum events to return + +**Returns:** +```json +{ + "events": [ + { + "event_id": 1438, + "timestamp": "2025-01-08T01:30:47", + "event_type": "task_completed", + "channel": "repo:dotfiles", + "session_id": "6cd931c1-929b-4c9c-beb6-507e43e7feec", + "repo": "dotfiles", + "payload": "Merged PR #182 - Add named sessions to /parallel-work" + } + ], + "type_breakdown": { + "task_completed": 15, + "gotcha_discovered": 8, + "ci_completed": 42 + }, + "total_events": 65 +} +``` + +## Integration with Insights + +When event-bus data is available, `get_insights()` includes cross-session activity: + +```json +{ + "cross_session_activity": { + "gotcha_discovered": 8, + "pattern_found": 3, + "help_needed": 2, + "task_completed": 15 + }, + "summary": { + "has_bus_events": true + } +} +``` + +This enables `/improve-workflow` to surface cross-session learnings. + +## Design Principles + +This integration follows the project's design philosophy: + +1. **Raw signals over interpretation**: Payloads are stored unchanged. The `repo` field is extracted for filtering, but no semantic analysis is performed. The consuming LLM interprets meaning. + +2. **Guaranteed drill-down**: `get_bus_events()` returns raw events with full payloads. You can filter by `event_type`, `session_id`, or `repo` to focus on specific signals. + +3. **API conformance**: Follows existing patterns: + - `ingest_bus_events(days)` matches `ingest_logs`, `ingest_git_history` + - `get_bus_events(...)` matches `get_session_events`, `get_agent_activity` + +## Schema + +The `bus_events` table (migration v6): + +```sql +CREATE TABLE bus_events ( + id INTEGER PRIMARY KEY, + event_id INTEGER UNIQUE NOT NULL, -- Original ID from event-bus + timestamp TIMESTAMP NOT NULL, + event_type TEXT NOT NULL, + channel TEXT, + session_id TEXT, + repo TEXT, -- Extracted from channel + payload TEXT, -- Raw payload unchanged + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- Indexes for common queries +CREATE INDEX idx_bus_events_timestamp ON bus_events(timestamp); +CREATE INDEX idx_bus_events_type ON bus_events(event_type); +CREATE INDEX idx_bus_events_session ON bus_events(session_id); +CREATE INDEX idx_bus_events_repo ON bus_events(repo); +``` + +## Troubleshooting + +### "event-bus database not found" + +Install [claude-event-bus](https://github.com/evansenter/claude-event-bus) and ensure at least one session has registered. + +### Events not appearing + +1. Check ingestion status: `session-analytics-cli bus-events --json | jq .total_events` +2. Force re-ingest: Events are ingested incrementally; if the event-bus database was recreated, you may need to clear the analytics `bus_events` table. + +### Stale data + +Data auto-refreshes when queries detect stale data (>5 min). You can also manually trigger ingestion via the MCP tool `ingest_bus_events()`. diff --git a/src/session_analytics/bus_ingest.py b/src/session_analytics/bus_ingest.py new file mode 100644 index 0000000..e06d008 --- /dev/null +++ b/src/session_analytics/bus_ingest.py @@ -0,0 +1,140 @@ +"""Event-bus ingestion for cross-session insights. + +Reads events from ~/.claude/contrib/event-bus/data.db and stores them +in session-analytics for queryable cross-session insights. +""" + +import logging +import sqlite3 +from datetime import datetime, timedelta +from pathlib import Path + +from session_analytics.storage import SQLiteStorage + +logger = logging.getLogger("session-analytics") + +EVENT_BUS_DB = Path.home() / ".claude" / "contrib" / "event-bus" / "data.db" + + +def _extract_repo(channel: str | None) -> str | None: + """Extract repo name from channel (e.g., 'repo:dotfiles' -> 'dotfiles').""" + if channel and channel.startswith("repo:"): + return channel[5:] + return None + + +def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: + """Ingest events from event-bus database. + + Performs incremental ingestion by tracking the last ingested event ID. + Events are read from the event-bus database in read-only mode. + + Args: + storage: Session analytics storage instance + days: Number of days to look back for initial ingestion + + Returns: + Dict with ingestion stats including events_ingested count + """ + if not EVENT_BUS_DB.exists(): + return { + "status": "skipped", + "reason": "event-bus database not found", + "path": str(EVENT_BUS_DB), + } + + # Get last ingested event_id for incremental updates + last_event = storage.execute_query("SELECT MAX(event_id) as last_id FROM bus_events") + last_id = last_event[0]["last_id"] if last_event and last_event[0]["last_id"] else 0 + + # Calculate cutoff for first-run ingestion + cutoff = datetime.now() - timedelta(days=days) + + # Read from event-bus DB (read-only mode) + try: + conn = sqlite3.connect(f"file:{EVENT_BUS_DB}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + except sqlite3.OperationalError as e: + return { + "status": "error", + "reason": f"Failed to connect to event-bus database: {e}", + "path": str(EVENT_BUS_DB), + } + + try: + # Query events newer than last ingested ID, or from cutoff on first run + if last_id > 0: + # Incremental: get events after last ID + rows = conn.execute( + """ + SELECT id, event_type, channel, session_id, timestamp, payload + FROM events + WHERE id > ? + ORDER BY id + """, + (last_id,), + ).fetchall() + else: + # First run: get events from cutoff + rows = conn.execute( + """ + SELECT id, event_type, channel, session_id, timestamp, payload + FROM events + WHERE timestamp >= ? + ORDER BY id + """, + (cutoff.isoformat(),), + ).fetchall() + + if not rows: + return { + "status": "ok", + "events_ingested": 0, + "last_event_id": last_id, + } + + # Batch insert into analytics database + events_data = [ + ( + row["id"], + row["timestamp"], + row["event_type"], + row["channel"], + row["session_id"], + _extract_repo(row["channel"]), + row["payload"], + ) + for row in rows + ] + + with storage._connect() as db_conn: + db_conn.executemany( + """ + INSERT OR IGNORE INTO bus_events + (event_id, timestamp, event_type, channel, session_id, repo, payload) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + events_data, + ) + + newest_id = rows[-1]["id"] + oldest_ts = rows[0]["timestamp"] + newest_ts = rows[-1]["timestamp"] + + logger.info( + "Ingested %d event-bus events (IDs %d-%d)", + len(rows), + rows[0]["id"], + newest_id, + ) + + return { + "status": "ok", + "events_ingested": len(rows), + "last_event_id": newest_id, + "oldest_event": oldest_ts, + "newest_event": newest_ts, + } + + finally: + conn.close() diff --git a/src/session_analytics/cli.py b/src/session_analytics/cli.py index 55ac4e1..c729f5e 100644 --- a/src/session_analytics/cli.py +++ b/src/session_analytics/cli.py @@ -25,6 +25,7 @@ get_handoff_context, get_user_journey, query_agent_activity, + query_bus_events, query_commands, query_file_activity, query_languages, @@ -360,7 +361,7 @@ def _format_ingest(data: dict) -> list[str]: ] -@_register_formatter(lambda d: "event_count" in d) +@_register_formatter(lambda d: "event_count" in d and "db_path" in d) def _format_status(data: dict) -> list[str]: lines = [ "Analytics database status and ingestion info", @@ -678,6 +679,26 @@ def cmd_agents(args): print(format_output(result, args.json)) +def cmd_bus_events(args): + """Show event-bus events for cross-session insights. + + RFC #54: Shows events from event-bus (gotchas, patterns, help, etc.). + """ + from session_analytics.bus_ingest import ingest_bus_events + + storage = SQLiteStorage() + # Ingest latest events before querying + ingest_bus_events(storage, days=args.days) + result = query_bus_events( + storage, + days=args.days, + event_type=args.event_type, + repo=args.repo, + limit=args.limit, + ) + print(format_output(result, args.json)) + + def cmd_insights(args): """Show insights for /improve-workflow.""" storage = SQLiteStorage() @@ -1135,6 +1156,16 @@ def main(): sub.add_argument("--project", help="Project path filter") sub.set_defaults(func=cmd_agents) + # bus-events (RFC #54) + sub = subparsers.add_parser( + "bus-events", help="Show event-bus events (gotchas, patterns, etc.)" + ) + sub.add_argument("--days", type=int, default=7, help="Days to analyze (default: 7)") + sub.add_argument("--event-type", help="Filter by event type (e.g., 'gotcha_discovered')") + sub.add_argument("--repo", help="Filter by repo name") + sub.add_argument("--limit", type=int, default=100, help="Max events to return (default: 100)") + sub.set_defaults(func=cmd_bus_events) + args = parser.parse_args() args.func(args) diff --git a/src/session_analytics/guide.md b/src/session_analytics/guide.md index b33eab0..00bb3db 100644 --- a/src/session_analytics/guide.md +++ b/src/session_analytics/guide.md @@ -103,6 +103,21 @@ Each session includes `classification_factors` explaining WHY it was categorized |------|---------| | `get_agent_activity(days?, project?)` | Task subagent activity vs main session (RFC #41) | +### Event-Bus Integration + +| Tool | Purpose | +|------|---------| +| `ingest_bus_events(days?)` | Import events from event-bus for cross-session insights | +| `get_bus_events(days?, event_type?, session_id?, repo?, limit?)` | Query event-bus events (gotchas, patterns, help) | + +Cross-session events include: +- `gotcha_discovered` - Non-obvious issues found during work +- `pattern_found` - Useful patterns identified +- `help_needed` / `help_response` - Cross-session coordination +- `task_completed` / `task_started` - Work progress tracking + +These appear in `get_insights()` under `cross_session_activity` when available. + ## Quick Start ### 1. Check status diff --git a/src/session_analytics/patterns.py b/src/session_analytics/patterns.py index 935a989..dbc97fd 100644 --- a/src/session_analytics/patterns.py +++ b/src/session_analytics/patterns.py @@ -913,6 +913,29 @@ def get_insights( logger.warning("Failed to classify sessions in get_insights: %s", e, exc_info=True) insights["summary"]["has_classification"] = False + # Event-bus cross-session activity (RFC #54) + try: + bus_rows = storage.execute_query( + """ + SELECT event_type, COUNT(*) as count + FROM bus_events + WHERE timestamp >= datetime('now', ? || ' days') + GROUP BY event_type + ORDER BY count DESC + """, + (-days,), + ) + if bus_rows: + insights["cross_session_activity"] = { + row["event_type"]: row["count"] for row in bus_rows + } + insights["summary"]["has_bus_events"] = True + else: + insights["summary"]["has_bus_events"] = False + except Exception as e: + logger.warning("Failed to query bus events in get_insights: %s", e, exc_info=True) + insights["summary"]["has_bus_events"] = False + return insights diff --git a/src/session_analytics/queries.py b/src/session_analytics/queries.py index d413c51..7e9ca49 100644 --- a/src/session_analytics/queries.py +++ b/src/session_analytics/queries.py @@ -1802,3 +1802,100 @@ def query_agent_activity( ), }, } + + +def query_bus_events( + storage: SQLiteStorage, + days: int = 7, + event_type: str | None = None, + session_id: str | None = None, + repo: str | None = None, + limit: int = 100, +) -> dict: + """Query event-bus events with optional filters. + + Returns raw events from the event-bus for cross-session insights. + Events include gotcha_discovered, pattern_found, help_needed, etc. + + Args: + storage: Storage instance + days: Number of days to analyze (default: 7) + event_type: Filter by event type (e.g., 'gotcha_discovered') + session_id: Filter by session ID + repo: Filter by repo name + limit: Maximum events to return (default: 100) + + Returns: + Dict with events list and type breakdown + """ + cutoff = get_cutoff(days=days) + + # Build where clause + where_parts = ["timestamp >= ?"] + params: list = [cutoff] + + if event_type: + where_parts.append("event_type = ?") + params.append(event_type) + if session_id: + where_parts.append("session_id = ?") + params.append(session_id) + if repo: + where_parts.append("repo = ?") + params.append(repo) + + where_clause = " AND ".join(where_parts) + params.append(limit) + + # Get events + rows = storage.execute_query( + f""" + SELECT + event_id, + timestamp, + event_type, + channel, + session_id, + repo, + payload + FROM bus_events + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT ? + """, + tuple(params), + ) + + events = [ + { + "event_id": row["event_id"], + "timestamp": _format_timestamp(row["timestamp"]), + "event_type": row["event_type"], + "channel": row["channel"], + "session_id": row["session_id"], + "repo": row["repo"], + "payload": row["payload"], + } + for row in rows + ] + + # Get type breakdown + type_rows = storage.execute_query( + f""" + SELECT event_type, COUNT(*) as count + FROM bus_events + WHERE {" AND ".join(where_parts[:-1]) if len(where_parts) > 1 else where_parts[0]} + GROUP BY event_type + ORDER BY count DESC + """, + tuple(params[:-1]), # Exclude limit param + ) + + type_counts = {row["event_type"]: row["count"] for row in type_rows} + + return { + "days": days, + "event_count": len(events), + "event_types": type_counts, + "events": events, + } diff --git a/src/session_analytics/server.py b/src/session_analytics/server.py index cb5ebf1..9f1fd3a 100644 --- a/src/session_analytics/server.py +++ b/src/session_analytics/server.py @@ -717,6 +717,59 @@ def get_agent_activity(days: int = 7, project: str | None = None) -> dict: return {"status": "ok", **result} +@mcp.tool() +def ingest_bus_events(days: int = 7) -> dict: + """Ingest events from event-bus for cross-session insights. + + Reads from ~/.claude/contrib/event-bus/data.db and stores + events for correlation with session activity. + + Args: + days: Number of days to ingest on first run (default: 7) + + Returns: + Ingestion statistics including events_ingested count + """ + from session_analytics.bus_ingest import ingest_bus_events as do_ingest + + result = do_ingest(storage, days=days) + return {"status": "ok", **result} + + +@mcp.tool() +def get_bus_events( + days: int = 7, + event_type: str | None = None, + session_id: str | None = None, + repo: str | None = None, + limit: int = 100, +) -> dict: + """Get event-bus events with optional filters. + + Returns raw events from the event-bus for cross-session insights. + Events include gotcha_discovered, pattern_found, help_needed, etc. + + Args: + days: Number of days to analyze (default: 7) + event_type: Filter by event type (e.g., 'gotcha_discovered') + session_id: Filter by session ID + repo: Filter by repo name + limit: Maximum events to return (default: 100) + + Returns: + Event-bus events with breakdown by type + """ + result = queries.query_bus_events( + storage, + days=days, + event_type=event_type, + session_id=session_id, + repo=repo, + limit=limit, + ) + return {"status": "ok", **result} + + def create_app(): """Create the ASGI app for uvicorn.""" # stateless_http=True allows resilience to server restarts diff --git a/src/session_analytics/storage.py b/src/session_analytics/storage.py index 247dc67..1c8d1af 100644 --- a/src/session_analytics/storage.py +++ b/src/session_analytics/storage.py @@ -144,11 +144,29 @@ def __post_init__(self): raise ValueError(f"SHA must be hexadecimal, got '{self.sha}'") +@dataclass +class BusEvent: + """An event from the event-bus for cross-session insights. + + Stores events like gotcha_discovered, pattern_found, help_needed, etc. + for correlation with session activity. + """ + + id: int | None + event_id: int # Original ID from event-bus DB + timestamp: datetime + event_type: str + channel: str | None = None + session_id: str | None = None + repo: str | None = None # Extracted from channel (e.g., "repo:dotfiles" -> "dotfiles") + payload: str | None = None # Raw payload text + + # Default database path DEFAULT_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" # Schema version for migrations -SCHEMA_VERSION = 5 +SCHEMA_VERSION = 6 # Migration functions: dict of version -> (migration_name, migration_func) # Each migration upgrades FROM version-1 TO version @@ -335,6 +353,32 @@ def migrate_v5(conn): conn.execute("CREATE INDEX IF NOT EXISTS idx_events_agent_id ON events(agent_id)") +@migration(6, "add_event_bus_integration") +def migrate_v6(conn): + """Add table for event-bus integration (RFC #54). + + Stores events from ~/.claude/contrib/event-bus/data.db for cross-session + insights like gotcha_discovered, pattern_found, help_needed, etc. + """ + conn.execute(""" + CREATE TABLE IF NOT EXISTS bus_events ( + id INTEGER PRIMARY KEY, + event_id INTEGER UNIQUE NOT NULL, + timestamp TIMESTAMP NOT NULL, + event_type TEXT NOT NULL, + channel TEXT, + session_id TEXT, + repo TEXT, + payload TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_timestamp ON bus_events(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_type ON bus_events(event_type)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_session ON bus_events(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_repo ON bus_events(repo)") + + class SQLiteStorage: """SQLite-backed storage for session analytics.""" @@ -621,6 +665,29 @@ def _init_db(self): ON events(id) WHERE user_message_text IS NOT NULL """) + # Event-bus integration for cross-session insights (RFC #54) + conn.execute(""" + CREATE TABLE IF NOT EXISTS bus_events ( + id INTEGER PRIMARY KEY, + event_id INTEGER UNIQUE NOT NULL, + timestamp TIMESTAMP NOT NULL, + event_type TEXT NOT NULL, + channel TEXT, + session_id TEXT, + repo TEXT, + payload TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_bus_events_timestamp ON bus_events(timestamp)" + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_type ON bus_events(event_type)") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_bus_events_session ON bus_events(session_id)" + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_repo ON bus_events(repo)") + # Run migrations AFTER all tables are created # Only existing databases need migrations - fresh databases have full schema current_version = self._get_schema_version(conn)