Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/SCHEMA.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ This document describes the SQLite database schema for agent-session-analytics.
| `bus_events` | Cross-session events from event-bus | ~2K |
| `events_fts` | FTS5 virtual table for user message search | N/A |
| `raw_entries` | Unparsed JSONL entries for future re-parsing | 100K+ |
| `raw_bus_events` | Unparsed event-bus entries for future re-parsing | ~2K |
| `project_aliases` | Alias mappings for renamed projects | ~10 |

---
Expand Down Expand Up @@ -212,6 +213,20 @@ CREATE TABLE raw_entries (

**Design note**: The UNIQUE constraint on `entry_json` ensures exact deduplication. While this means large JSON values are compared, SQLite handles this efficiently and it avoids hash collision edge cases.

### raw_bus_events

Unparsed event-bus entries for future re-parsing. Mirrors the `raw_entries` pattern — stores the full JSON from the event-bus database so events can be re-parsed if the schema or ingestion logic changes.

```sql
CREATE TABLE raw_bus_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id INTEGER NOT NULL UNIQUE, -- Original ID from event-bus
timestamp TEXT NOT NULL,
entry_json TEXT NOT NULL, -- Full original event JSON
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
)
```

### project_aliases

Maps alias names to target patterns for flexible project filtering. When filtering by an alias, queries automatically expand to match both the alias and all its targets.
Expand Down Expand Up @@ -266,6 +281,8 @@ Performance-critical indexes on the `events` table:
| `bus_events` | `idx_bus_events_repo` | `repo` |
| `raw_entries` | `idx_raw_entries_session` | `session_id` |
| `raw_entries` | `idx_raw_entries_timestamp` | `timestamp` |
| `raw_bus_events` | `idx_raw_bus_events_event_id` | `event_id` |
| `raw_bus_events` | `idx_raw_bus_events_timestamp` | `timestamp` |
| `project_aliases` | `idx_project_aliases_alias` | `alias COLLATE NOCASE` |

---
Expand Down Expand Up @@ -307,6 +324,7 @@ Sync triggers maintain index consistency:
| 12 | fix_warmup_not_errors | Fix warmup events incorrectly marked as errors (Issue #75) |
| 13 | add_raw_entries_table | Raw JSONL storage for future re-parsing (Issue #93) |
| 14 | add_project_aliases | Project alias table for renamed project matching (Issue #71) |
| 15 | add_raw_bus_events_table | Raw event-bus JSON storage for future re-parsing (Issue #106) |

---

Expand Down
23 changes: 22 additions & 1 deletion src/agent_session_analytics/bus_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

Reads events from ~/.claude/contrib/agent-event-bus/data.db and stores them
in agent-session-analytics for queryable cross-session insights.
Raw event JSON is also stored in raw_bus_events for future re-parsing.
"""

import json
import logging
import sqlite3
from datetime import datetime, timedelta
Expand All @@ -28,6 +30,7 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:

Performs incremental ingestion by tracking the last ingested event ID.
Events are read from the event-bus database in read-only mode.
Raw event JSON is stored alongside parsed data for future re-parsing.

Args:
storage: Session analytics storage instance
Expand Down Expand Up @@ -93,7 +96,7 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
"last_event_id": last_id,
}

# Batch insert into analytics database
# Batch insert parsed events into analytics database
events_data = [
(
row["id"],
Expand All @@ -107,6 +110,16 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
for row in rows
]

# Store raw event JSON for future re-parsing
raw_data = [
(
row["id"],
row["timestamp"],
json.dumps(dict(row)),
)
for row in rows
]

with storage._connect() as db_conn:
db_conn.executemany(
"""
Expand All @@ -116,6 +129,14 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
""",
events_data,
)
db_conn.executemany(
"""
INSERT OR IGNORE INTO raw_bus_events
(event_id, timestamp, entry_json)
VALUES (?, ?, ?)
""",
raw_data,
)

newest_id = rows[-1]["id"]
oldest_ts = rows[0]["timestamp"]
Expand Down
34 changes: 33 additions & 1 deletion src/agent_session_analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,33 @@ def _format_aliases(data: dict) -> list[str]:
return lines


@_register_formatter(lambda d: "event_types" in d and "events" in d and "event_count" in d)
def _format_bus_events(data: dict) -> list[str]:
lines = [
f"Event-bus events ({data['event_count']} events, last {data.get('days', '?')} days)",
"",
]
type_counts = data.get("event_types", {})
if type_counts:
lines.append("Event types:")
for etype, count in type_counts.items():
lines.append(f" {etype}: {count}")
lines.append("")
for event in data.get("events", []):
ts = event.get("timestamp", "?")
etype = event.get("event_type", "?")
repo = event.get("repo", "")
repo_str = f" [{repo}]" if repo else ""
payload = event.get("payload") or ""
# Truncate long payloads for display
if len(payload) > 200:
payload = payload[:200] + "..."
lines.append(f" {ts} {etype}{repo_str}")
lines.append(f" {payload}")
lines.append("")
return lines


def format_output(data: dict, json_output: bool = False) -> str:
"""Format output as JSON or human-readable."""
if json_output:
Expand Down Expand Up @@ -1416,6 +1443,9 @@ def cmd_benchmark(args):
from agent_session_analytics.queries import (
query_agent_activity as queries_query_agent_activity,
)
from agent_session_analytics.queries import (
query_bus_events as queries_query_bus_events,
)
from agent_session_analytics.queries import (
query_error_details as queries_query_error_details,
)
Expand Down Expand Up @@ -1447,7 +1477,7 @@ def cmd_benchmark(args):
# Define all MCP tools with their default parameters
# These call the underlying query functions directly (not the MCP wrappers)
# Skip mutating tools (ingest_*) and tools requiring specific IDs
# Note: Removed tools not in MCP (get_command_frequency, get_languages, get_bus_events,
# Note: Removed tools not in MCP (get_command_frequency, get_languages,
# analyze_pre_compaction_patterns) - CLI still has them for backward compat
tool_functions = {
"get_status": lambda: storage.get_db_stats(),
Expand Down Expand Up @@ -1494,6 +1524,8 @@ def cmd_benchmark(args):
"get_session_efficiency": lambda: queries_get_session_efficiency(storage, days=7),
# Issue #71: Project aliases
"list_project_aliases": lambda: storage.get_project_aliases(),
# Issue #106: Bus events integration
"get_bus_events": lambda: queries_query_bus_events(storage, days=7, limit=10),
}

# Skipped tools (require specific data or modify DB):
Expand Down
22 changes: 22 additions & 0 deletions src/agent_session_analytics/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ agent-session-analytics-cli alias remove genai-rs
- Aliases expand to OR clauses: `WHERE project_path LIKE '%genai-rs%' OR project_path LIKE '%rust-genai%'`
- Multiple targets can be added per alias (e.g., for projects renamed multiple times)

### Event Bus Integration

Cross-session knowledge events from the agent-event-bus (gotchas, patterns, improvement suggestions). Events are ingested from the co-located event-bus SQLite database and stored in both parsed form (`bus_events` table) and raw JSON form (`raw_bus_events` table) for future re-parsing.

| Tool | Purpose |
|------|---------|
| `get_bus_events(days?, event_type?, repo?, session_id?, limit?)` | Query cross-session knowledge events |
| `ingest_bus_events(days?)` | Force refresh from event-bus database |

**Key event types:**
- `gotcha_discovered` — Non-obvious bugs or pitfalls
- `pattern_found` — Reusable solutions and techniques
- `improvement_suggested` — Workflow or tooling gap proposals

**Example usage:**
```
get_bus_events(event_type="gotcha_discovered", days=30)
get_bus_events(repo="agent-event-bus", limit=10)
```

**Automatic ingestion:** Bus events are ingested on server startup and every 5 minutes by the background loop. Use `ingest_bus_events()` to force an immediate refresh.

### Core Queries

| Tool | Purpose |
Expand Down
8 changes: 4 additions & 4 deletions src/agent_session_analytics/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,7 @@ def query_bus_events(
event_type: str | None = None,
session_id: str | None = None,
repo: str | None = None,
limit: int = 100,
limit: int = 50,
) -> dict:
"""Query event-bus events with optional filters.

Expand All @@ -1933,7 +1933,7 @@ def query_bus_events(
event_type: Filter by event type (e.g., 'gotcha_discovered')
session_id: Filter by session ID
repo: Filter by repo name
limit: Maximum events to return (default: 100)
limit: Maximum events to return (default: 50)

Returns:
Dict with events list and type breakdown
Expand Down Expand Up @@ -1989,12 +1989,12 @@ def query_bus_events(
for row in rows
]

# Get type breakdown
# Get type breakdown (uses same filters but no LIMIT)
type_rows = storage.execute_query(
f"""
SELECT event_type, COUNT(*) as count
FROM bus_events
WHERE {" AND ".join(where_parts[:-1]) if len(where_parts) > 1 else where_parts[0]}
WHERE {where_clause}
GROUP BY event_type
ORDER BY count DESC
""",
Expand Down
55 changes: 54 additions & 1 deletion src/agent_session_analytics/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from fastmcp import FastMCP

from agent_session_analytics import ingest, patterns, queries
from agent_session_analytics.bus_ingest import ingest_bus_events as _ingest_bus_events
from agent_session_analytics.storage import SQLiteStorage

# Configure logging
Expand Down Expand Up @@ -48,6 +49,12 @@ async def server_lifespan(server) -> AsyncIterator[dict]:
logger.info("Startup ingestion complete")
except Exception:
logger.exception("Startup ingestion failed, server starting anyway")
try:
logger.info("Running startup bus event ingestion...")
await asyncio.to_thread(_ingest_bus_events, storage)
logger.info("Startup bus event ingestion complete")
except Exception:
logger.exception("Startup bus event ingestion failed, server starting anyway")
task = asyncio.create_task(_periodic_ingest())
yield {}
task.cancel()
Expand All @@ -56,14 +63,19 @@ async def server_lifespan(server) -> AsyncIterator[dict]:


async def _periodic_ingest():
"""Background loop: ingest local JSONL files every 5 minutes."""
"""Background loop: ingest local JSONL files and bus events every 5 minutes."""
while True:
await asyncio.sleep(INGEST_INTERVAL_SECONDS)
try:
await asyncio.to_thread(ingest.ingest_logs, storage, days=1)
logger.info("Background ingestion complete")
except Exception:
logger.exception("Background ingestion failed")
try:
await asyncio.to_thread(_ingest_bus_events, storage)
logger.info("Background bus event ingestion complete")
except Exception:
logger.exception("Background bus event ingestion failed")


# Initialize MCP server
Expand Down Expand Up @@ -252,6 +264,47 @@ def list_project_aliases(alias: str | None = None) -> dict:
return {"status": "ok", "aliases": aliases}


# --- Event Bus Integration ---


@mcp.tool()
def get_bus_events(
days: int = 7,
event_type: str | None = None,
repo: str | None = None,
session_id: str | None = None,
limit: int = 50,
) -> dict:
"""Get events from the event bus (gotchas, patterns, improvements, etc.).

Args:
days: Days to analyze (default: 7)
event_type: Filter by type (e.g., 'gotcha_discovered', 'pattern_found')
repo: Filter by repo name
session_id: Filter by session ID
limit: Max events (default: 50)
"""
result = queries.query_bus_events(
storage,
days=days,
event_type=event_type,
repo=repo,
session_id=session_id,
limit=limit,
)
return {"status": "ok", **result}


@mcp.tool()
def ingest_bus_events(days: int = 7) -> dict:
"""Refresh data from event-bus database.

Args:
days: Days to look back (default: 7)
"""
return _ingest_bus_events(storage, days=days)


@mcp.tool()
def get_tool_frequency(days: int = 7, project: str | None = None, expand: bool = True) -> dict:
"""Get tool usage frequency counts.
Expand Down
50 changes: 49 additions & 1 deletion src/agent_session_analytics/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class BusEvent:
OLD_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db"

# Schema version for migrations
SCHEMA_VERSION = 14
SCHEMA_VERSION = 15

# Migration functions: dict of version -> (migration_name, migration_func)
# Each migration upgrades FROM version-1 TO version
Expand Down Expand Up @@ -643,6 +643,34 @@ def migrate_v14(conn):
logger.info("Created project_aliases table for flexible project name matching")


@migration(15, "add_raw_bus_events_table")
def migrate_v15(conn):
"""Add raw_bus_events table for storing unparsed event-bus entries.

Mirrors the raw_entries pattern: stores the full JSON from the event-bus
database so events can be re-parsed if the schema or ingestion logic changes.
Separate from bus_events to keep parsed and raw data independent.
"""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS raw_bus_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id INTEGER NOT NULL UNIQUE,
timestamp TEXT NOT NULL,
entry_json TEXT NOT NULL,
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_raw_bus_events_event_id ON raw_bus_events(event_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_raw_bus_events_timestamp ON raw_bus_events(timestamp)"
)
logger.info("Created raw_bus_events table for storing unparsed event-bus entries")


class SQLiteStorage:
"""SQLite-backed storage for session analytics."""

Expand Down Expand Up @@ -973,6 +1001,26 @@ def _init_db(self):
"ON project_aliases(alias COLLATE NOCASE)"
)

# Raw bus events table for storing unparsed event-bus entries (v15)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS raw_bus_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id INTEGER NOT NULL UNIQUE,
timestamp TEXT NOT NULL,
entry_json TEXT NOT NULL,
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_raw_bus_events_event_id ON raw_bus_events(event_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_raw_bus_events_timestamp "
"ON raw_bus_events(timestamp)"
)

# Run migrations AFTER all tables are created
# Only existing databases need migrations - fresh databases have full schema
current_version = self._get_schema_version(conn)
Expand Down
Loading