diff --git a/docs/SCHEMA.md b/docs/SCHEMA.md index 04f4968..3c3818f 100644 --- a/docs/SCHEMA.md +++ b/docs/SCHEMA.md @@ -29,6 +29,7 @@ This document describes the SQLite database schema for agent-session-analytics. | `bus_events` | Cross-session events from event-bus | ~2K | | `events_fts` | FTS5 virtual table for user message search | N/A | | `raw_entries` | Unparsed JSONL entries for future re-parsing | 100K+ | +| `raw_bus_events` | Unparsed event-bus entries for future re-parsing | ~2K | | `project_aliases` | Alias mappings for renamed projects | ~10 | --- @@ -212,6 +213,20 @@ CREATE TABLE raw_entries ( **Design note**: The UNIQUE constraint on `entry_json` ensures exact deduplication. While this means large JSON values are compared, SQLite handles this efficiently and it avoids hash collision edge cases. +### raw_bus_events + +Unparsed event-bus entries for future re-parsing. Mirrors the `raw_entries` pattern — stores the full JSON from the event-bus database so events can be re-parsed if the schema or ingestion logic changes. + +```sql +CREATE TABLE raw_bus_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_id INTEGER NOT NULL UNIQUE, -- Original ID from event-bus + timestamp TEXT NOT NULL, + entry_json TEXT NOT NULL, -- Full original event JSON + ingested_at TEXT NOT NULL DEFAULT (datetime('now')) +) +``` + ### project_aliases Maps alias names to target patterns for flexible project filtering. When filtering by an alias, queries automatically expand to match both the alias and all its targets. @@ -266,6 +281,8 @@ Performance-critical indexes on the `events` table: | `bus_events` | `idx_bus_events_repo` | `repo` | | `raw_entries` | `idx_raw_entries_session` | `session_id` | | `raw_entries` | `idx_raw_entries_timestamp` | `timestamp` | +| `raw_bus_events` | `idx_raw_bus_events_event_id` | `event_id` | +| `raw_bus_events` | `idx_raw_bus_events_timestamp` | `timestamp` | | `project_aliases` | `idx_project_aliases_alias` | `alias COLLATE NOCASE` | --- @@ -307,6 +324,7 @@ Sync triggers maintain index consistency: | 12 | fix_warmup_not_errors | Fix warmup events incorrectly marked as errors (Issue #75) | | 13 | add_raw_entries_table | Raw JSONL storage for future re-parsing (Issue #93) | | 14 | add_project_aliases | Project alias table for renamed project matching (Issue #71) | +| 15 | add_raw_bus_events_table | Raw event-bus JSON storage for future re-parsing (Issue #106) | --- diff --git a/src/agent_session_analytics/bus_ingest.py b/src/agent_session_analytics/bus_ingest.py index f6106c6..bfa8ebb 100644 --- a/src/agent_session_analytics/bus_ingest.py +++ b/src/agent_session_analytics/bus_ingest.py @@ -2,8 +2,10 @@ Reads events from ~/.claude/contrib/agent-event-bus/data.db and stores them in agent-session-analytics for queryable cross-session insights. +Raw event JSON is also stored in raw_bus_events for future re-parsing. """ +import json import logging import sqlite3 from datetime import datetime, timedelta @@ -28,6 +30,7 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: Performs incremental ingestion by tracking the last ingested event ID. Events are read from the event-bus database in read-only mode. + Raw event JSON is stored alongside parsed data for future re-parsing. Args: storage: Session analytics storage instance @@ -93,7 +96,7 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: "last_event_id": last_id, } - # Batch insert into analytics database + # Batch insert parsed events into analytics database events_data = [ ( row["id"], @@ -107,6 +110,16 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: for row in rows ] + # Store raw event JSON for future re-parsing + raw_data = [ + ( + row["id"], + row["timestamp"], + json.dumps(dict(row)), + ) + for row in rows + ] + with storage._connect() as db_conn: db_conn.executemany( """ @@ -116,6 +129,14 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: """, events_data, ) + db_conn.executemany( + """ + INSERT OR IGNORE INTO raw_bus_events + (event_id, timestamp, entry_json) + VALUES (?, ?, ?) + """, + raw_data, + ) newest_id = rows[-1]["id"] oldest_ts = rows[0]["timestamp"] diff --git a/src/agent_session_analytics/cli.py b/src/agent_session_analytics/cli.py index 5059283..6728e24 100644 --- a/src/agent_session_analytics/cli.py +++ b/src/agent_session_analytics/cli.py @@ -824,6 +824,33 @@ def _format_aliases(data: dict) -> list[str]: return lines +@_register_formatter(lambda d: "event_types" in d and "events" in d and "event_count" in d) +def _format_bus_events(data: dict) -> list[str]: + lines = [ + f"Event-bus events ({data['event_count']} events, last {data.get('days', '?')} days)", + "", + ] + type_counts = data.get("event_types", {}) + if type_counts: + lines.append("Event types:") + for etype, count in type_counts.items(): + lines.append(f" {etype}: {count}") + lines.append("") + for event in data.get("events", []): + ts = event.get("timestamp", "?") + etype = event.get("event_type", "?") + repo = event.get("repo", "") + repo_str = f" [{repo}]" if repo else "" + payload = event.get("payload") or "" + # Truncate long payloads for display + if len(payload) > 200: + payload = payload[:200] + "..." + lines.append(f" {ts} {etype}{repo_str}") + lines.append(f" {payload}") + lines.append("") + return lines + + def format_output(data: dict, json_output: bool = False) -> str: """Format output as JSON or human-readable.""" if json_output: @@ -1416,6 +1443,9 @@ def cmd_benchmark(args): from agent_session_analytics.queries import ( query_agent_activity as queries_query_agent_activity, ) + from agent_session_analytics.queries import ( + query_bus_events as queries_query_bus_events, + ) from agent_session_analytics.queries import ( query_error_details as queries_query_error_details, ) @@ -1447,7 +1477,7 @@ def cmd_benchmark(args): # Define all MCP tools with their default parameters # These call the underlying query functions directly (not the MCP wrappers) # Skip mutating tools (ingest_*) and tools requiring specific IDs - # Note: Removed tools not in MCP (get_command_frequency, get_languages, get_bus_events, + # Note: Removed tools not in MCP (get_command_frequency, get_languages, # analyze_pre_compaction_patterns) - CLI still has them for backward compat tool_functions = { "get_status": lambda: storage.get_db_stats(), @@ -1494,6 +1524,8 @@ def cmd_benchmark(args): "get_session_efficiency": lambda: queries_get_session_efficiency(storage, days=7), # Issue #71: Project aliases "list_project_aliases": lambda: storage.get_project_aliases(), + # Issue #106: Bus events integration + "get_bus_events": lambda: queries_query_bus_events(storage, days=7, limit=10), } # Skipped tools (require specific data or modify DB): diff --git a/src/agent_session_analytics/guide.md b/src/agent_session_analytics/guide.md index 71d7cae..f70ed11 100644 --- a/src/agent_session_analytics/guide.md +++ b/src/agent_session_analytics/guide.md @@ -81,6 +81,28 @@ agent-session-analytics-cli alias remove genai-rs - Aliases expand to OR clauses: `WHERE project_path LIKE '%genai-rs%' OR project_path LIKE '%rust-genai%'` - Multiple targets can be added per alias (e.g., for projects renamed multiple times) +### Event Bus Integration + +Cross-session knowledge events from the agent-event-bus (gotchas, patterns, improvement suggestions). Events are ingested from the co-located event-bus SQLite database and stored in both parsed form (`bus_events` table) and raw JSON form (`raw_bus_events` table) for future re-parsing. + +| Tool | Purpose | +|------|---------| +| `get_bus_events(days?, event_type?, repo?, session_id?, limit?)` | Query cross-session knowledge events | +| `ingest_bus_events(days?)` | Force refresh from event-bus database | + +**Key event types:** +- `gotcha_discovered` — Non-obvious bugs or pitfalls +- `pattern_found` — Reusable solutions and techniques +- `improvement_suggested` — Workflow or tooling gap proposals + +**Example usage:** +``` +get_bus_events(event_type="gotcha_discovered", days=30) +get_bus_events(repo="agent-event-bus", limit=10) +``` + +**Automatic ingestion:** Bus events are ingested on server startup and every 5 minutes by the background loop. Use `ingest_bus_events()` to force an immediate refresh. + ### Core Queries | Tool | Purpose | diff --git a/src/agent_session_analytics/queries.py b/src/agent_session_analytics/queries.py index 7ec4113..0bf5e03 100644 --- a/src/agent_session_analytics/queries.py +++ b/src/agent_session_analytics/queries.py @@ -1920,7 +1920,7 @@ def query_bus_events( event_type: str | None = None, session_id: str | None = None, repo: str | None = None, - limit: int = 100, + limit: int = 50, ) -> dict: """Query event-bus events with optional filters. @@ -1933,7 +1933,7 @@ def query_bus_events( event_type: Filter by event type (e.g., 'gotcha_discovered') session_id: Filter by session ID repo: Filter by repo name - limit: Maximum events to return (default: 100) + limit: Maximum events to return (default: 50) Returns: Dict with events list and type breakdown @@ -1989,12 +1989,12 @@ def query_bus_events( for row in rows ] - # Get type breakdown + # Get type breakdown (uses same filters but no LIMIT) type_rows = storage.execute_query( f""" SELECT event_type, COUNT(*) as count FROM bus_events - WHERE {" AND ".join(where_parts[:-1]) if len(where_parts) > 1 else where_parts[0]} + WHERE {where_clause} GROUP BY event_type ORDER BY count DESC """, diff --git a/src/agent_session_analytics/server.py b/src/agent_session_analytics/server.py index 94be836..9ca5c6c 100644 --- a/src/agent_session_analytics/server.py +++ b/src/agent_session_analytics/server.py @@ -21,6 +21,7 @@ from fastmcp import FastMCP from agent_session_analytics import ingest, patterns, queries +from agent_session_analytics.bus_ingest import ingest_bus_events as _ingest_bus_events from agent_session_analytics.storage import SQLiteStorage # Configure logging @@ -48,6 +49,12 @@ async def server_lifespan(server) -> AsyncIterator[dict]: logger.info("Startup ingestion complete") except Exception: logger.exception("Startup ingestion failed, server starting anyway") + try: + logger.info("Running startup bus event ingestion...") + await asyncio.to_thread(_ingest_bus_events, storage) + logger.info("Startup bus event ingestion complete") + except Exception: + logger.exception("Startup bus event ingestion failed, server starting anyway") task = asyncio.create_task(_periodic_ingest()) yield {} task.cancel() @@ -56,7 +63,7 @@ async def server_lifespan(server) -> AsyncIterator[dict]: async def _periodic_ingest(): - """Background loop: ingest local JSONL files every 5 minutes.""" + """Background loop: ingest local JSONL files and bus events every 5 minutes.""" while True: await asyncio.sleep(INGEST_INTERVAL_SECONDS) try: @@ -64,6 +71,11 @@ async def _periodic_ingest(): logger.info("Background ingestion complete") except Exception: logger.exception("Background ingestion failed") + try: + await asyncio.to_thread(_ingest_bus_events, storage) + logger.info("Background bus event ingestion complete") + except Exception: + logger.exception("Background bus event ingestion failed") # Initialize MCP server @@ -252,6 +264,47 @@ def list_project_aliases(alias: str | None = None) -> dict: return {"status": "ok", "aliases": aliases} +# --- Event Bus Integration --- + + +@mcp.tool() +def get_bus_events( + days: int = 7, + event_type: str | None = None, + repo: str | None = None, + session_id: str | None = None, + limit: int = 50, +) -> dict: + """Get events from the event bus (gotchas, patterns, improvements, etc.). + + Args: + days: Days to analyze (default: 7) + event_type: Filter by type (e.g., 'gotcha_discovered', 'pattern_found') + repo: Filter by repo name + session_id: Filter by session ID + limit: Max events (default: 50) + """ + result = queries.query_bus_events( + storage, + days=days, + event_type=event_type, + repo=repo, + session_id=session_id, + limit=limit, + ) + return {"status": "ok", **result} + + +@mcp.tool() +def ingest_bus_events(days: int = 7) -> dict: + """Refresh data from event-bus database. + + Args: + days: Days to look back (default: 7) + """ + return _ingest_bus_events(storage, days=days) + + @mcp.tool() def get_tool_frequency(days: int = 7, project: str | None = None, expand: bool = True) -> dict: """Get tool usage frequency counts. diff --git a/src/agent_session_analytics/storage.py b/src/agent_session_analytics/storage.py index 224c4b1..f512e7d 100644 --- a/src/agent_session_analytics/storage.py +++ b/src/agent_session_analytics/storage.py @@ -177,7 +177,7 @@ class BusEvent: OLD_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" # Schema version for migrations -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 # Migration functions: dict of version -> (migration_name, migration_func) # Each migration upgrades FROM version-1 TO version @@ -643,6 +643,34 @@ def migrate_v14(conn): logger.info("Created project_aliases table for flexible project name matching") +@migration(15, "add_raw_bus_events_table") +def migrate_v15(conn): + """Add raw_bus_events table for storing unparsed event-bus entries. + + Mirrors the raw_entries pattern: stores the full JSON from the event-bus + database so events can be re-parsed if the schema or ingestion logic changes. + Separate from bus_events to keep parsed and raw data independent. + """ + conn.execute( + """ + CREATE TABLE IF NOT EXISTS raw_bus_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_id INTEGER NOT NULL UNIQUE, + timestamp TEXT NOT NULL, + entry_json TEXT NOT NULL, + ingested_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_raw_bus_events_event_id ON raw_bus_events(event_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_raw_bus_events_timestamp ON raw_bus_events(timestamp)" + ) + logger.info("Created raw_bus_events table for storing unparsed event-bus entries") + + class SQLiteStorage: """SQLite-backed storage for session analytics.""" @@ -973,6 +1001,26 @@ def _init_db(self): "ON project_aliases(alias COLLATE NOCASE)" ) + # Raw bus events table for storing unparsed event-bus entries (v15) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS raw_bus_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_id INTEGER NOT NULL UNIQUE, + timestamp TEXT NOT NULL, + entry_json TEXT NOT NULL, + ingested_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_raw_bus_events_event_id ON raw_bus_events(event_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_raw_bus_events_timestamp " + "ON raw_bus_events(timestamp)" + ) + # Run migrations AFTER all tables are created # Only existing databases need migrations - fresh databases have full schema current_version = self._get_schema_version(conn) diff --git a/tests/test_bus_ingest.py b/tests/test_bus_ingest.py new file mode 100644 index 0000000..588adb0 --- /dev/null +++ b/tests/test_bus_ingest.py @@ -0,0 +1,286 @@ +"""Tests for event-bus ingestion and querying.""" + +import json +import sqlite3 +from datetime import datetime, timedelta +from pathlib import Path +from unittest.mock import patch + +import pytest + +from agent_session_analytics.bus_ingest import ( + _extract_repo, + ingest_bus_events, +) +from agent_session_analytics.queries import query_bus_events + + +class TestExtractRepo: + """Tests for the _extract_repo helper.""" + + def test_repo_channel(self): + assert _extract_repo("repo:dotfiles") == "dotfiles" + + def test_session_channel(self): + assert _extract_repo("session:abc-123") is None + + def test_all_channel(self): + assert _extract_repo("all") is None + + def test_none_channel(self): + assert _extract_repo(None) is None + + +@pytest.fixture +def bus_db(tmp_path): + """Create a temporary event-bus database with sample events.""" + db_path = tmp_path / "event-bus.db" + conn = sqlite3.connect(str(db_path)) + conn.execute( + """ + CREATE TABLE events ( + id INTEGER PRIMARY KEY, + event_type TEXT NOT NULL, + channel TEXT, + session_id TEXT, + timestamp TEXT NOT NULL, + payload TEXT + ) + """ + ) + now = datetime.now() + events = [ + ( + 1, + "gotcha_discovered", + "repo:dotfiles", + "session-1", + (now - timedelta(hours=1)).isoformat(), + "git show preserves symlinks", + ), + ( + 2, + "pattern_found", + "repo:agent-event-bus", + "session-2", + (now - timedelta(hours=2)).isoformat(), + "SQLite self-joins need explicit indexes", + ), + ( + 3, + "improvement_suggested", + "all", + "session-1", + (now - timedelta(hours=3)).isoformat(), + "Add log format validation to CI", + ), + ( + 4, + "session_registered", + "all", + "session-3", + (now - timedelta(hours=4)).isoformat(), + "session started", + ), + ( + 5, + "gotcha_discovered", + "repo:dotfiles", + "session-2", + (now - timedelta(days=10)).isoformat(), + "Old gotcha outside default window", + ), + ] + conn.executemany( + "INSERT INTO events (id, event_type, channel, session_id, timestamp, payload) VALUES (?, ?, ?, ?, ?, ?)", + events, + ) + conn.commit() + conn.close() + return db_path + + +class TestIngestBusEvents: + """Tests for the ingest_bus_events function.""" + + def test_ingest_from_bus_db(self, storage, bus_db): + """Test basic ingestion from an event-bus database.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + result = ingest_bus_events(storage, days=7) + + assert result["status"] == "ok" + assert result["events_ingested"] == 4 # 4 within 7 days + + def test_incremental_ingestion(self, storage, bus_db): + """Test that second ingestion only picks up new events.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + # Ingest all events (days=30 to include the old one) + result1 = ingest_bus_events(storage, days=30) + assert result1["events_ingested"] == 5 + + # Second run should find nothing new + result2 = ingest_bus_events(storage, days=30) + assert result2["events_ingested"] == 0 + + def test_incremental_picks_up_new_events(self, storage, bus_db): + """Test that new events added after first ingestion are picked up.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + ingest_bus_events(storage, days=30) + + # Add a new event to the bus DB + conn = sqlite3.connect(str(bus_db)) + conn.execute( + "INSERT INTO events (id, event_type, channel, session_id, timestamp, payload) " + "VALUES (?, ?, ?, ?, ?, ?)", + ( + 6, + "gotcha_discovered", + "repo:test", + "session-4", + datetime.now().isoformat(), + "New gotcha", + ), + ) + conn.commit() + conn.close() + + result2 = ingest_bus_events(storage, days=30) + assert result2["events_ingested"] == 1 + + def test_missing_db_skips(self, storage): + """Test graceful handling when event-bus DB doesn't exist.""" + with patch( + "agent_session_analytics.bus_ingest.EVENT_BUS_DB", Path("/nonexistent/path/data.db") + ): + result = ingest_bus_events(storage) + + assert result["status"] == "skipped" + assert "not found" in result["reason"] + + def test_raw_events_stored(self, storage, bus_db): + """Test that raw event JSON is stored in raw_bus_events table.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + ingest_bus_events(storage, days=7) + + # Check raw_bus_events table + rows = storage.execute_query("SELECT * FROM raw_bus_events ORDER BY event_id") + assert len(rows) == 4 # 4 within 7 days + + # Verify raw JSON is parseable and contains original fields + raw = json.loads(rows[0]["entry_json"]) + assert "id" in raw + assert "event_type" in raw + assert "payload" in raw + + def test_raw_events_dedup(self, storage, bus_db): + """Test that re-ingesting doesn't duplicate raw events.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + ingest_bus_events(storage, days=30) + # Force re-ingest by resetting the bus_events high-water mark + storage.execute_write("DELETE FROM bus_events") + ingest_bus_events(storage, days=30) + + # raw_bus_events should still only have one copy per event (INSERT OR IGNORE) + rows = storage.execute_query("SELECT COUNT(*) as cnt FROM raw_bus_events") + assert rows[0]["cnt"] == 5 # All 5 events, each stored once + + def test_repo_extraction(self, storage, bus_db): + """Test that repo is correctly extracted from channel.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + ingest_bus_events(storage, days=7) + + rows = storage.execute_query( + "SELECT repo FROM bus_events WHERE event_type = 'gotcha_discovered' ORDER BY event_id" + ) + assert rows[0]["repo"] == "dotfiles" + + def test_full_history_ingestion(self, storage, bus_db): + """Test ingestion with large days window gets all events.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + result = ingest_bus_events(storage, days=30) + + assert result["events_ingested"] == 5 # All events including old one + + +class TestQueryBusEvents: + """Tests for the query_bus_events function.""" + + @pytest.fixture + def storage_with_bus_events(self, storage, bus_db): + """Storage with bus events ingested.""" + with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): + ingest_bus_events(storage, days=30) + return storage + + def test_query_all(self, storage_with_bus_events): + """Test querying all bus events.""" + result = query_bus_events(storage_with_bus_events, days=30) + assert result["event_count"] == 5 + + def test_query_by_type(self, storage_with_bus_events): + """Test filtering by event type.""" + result = query_bus_events(storage_with_bus_events, days=30, event_type="gotcha_discovered") + assert result["event_count"] == 2 + for event in result["events"]: + assert event["event_type"] == "gotcha_discovered" + + def test_query_by_repo(self, storage_with_bus_events): + """Test filtering by repo.""" + result = query_bus_events(storage_with_bus_events, days=30, repo="dotfiles") + assert result["event_count"] == 2 + for event in result["events"]: + assert event["repo"] == "dotfiles" + + def test_query_by_session(self, storage_with_bus_events): + """Test filtering by session ID.""" + result = query_bus_events(storage_with_bus_events, days=30, session_id="session-1") + assert result["event_count"] == 2 + + def test_query_type_breakdown(self, storage_with_bus_events): + """Test that type breakdown is returned.""" + result = query_bus_events(storage_with_bus_events, days=30) + assert "event_types" in result + assert result["event_types"]["gotcha_discovered"] == 2 + assert result["event_types"]["pattern_found"] == 1 + + def test_query_type_breakdown_with_filter(self, storage_with_bus_events): + """Test that type breakdown respects filters (regression for where_parts bug).""" + result = query_bus_events(storage_with_bus_events, days=30, repo="dotfiles") + assert result["event_count"] == 2 + # Type breakdown should only reflect filtered events + assert result["event_types"]["gotcha_discovered"] == 2 + assert "pattern_found" not in result["event_types"] + + def test_query_limit(self, storage_with_bus_events): + """Test result limiting.""" + result = query_bus_events(storage_with_bus_events, days=30, limit=2) + assert result["event_count"] == 2 + + def test_query_empty_table(self, storage): + """Test querying empty bus_events table.""" + result = query_bus_events(storage, days=7) + assert result["event_count"] == 0 + assert result["events"] == [] + + def test_query_days_filter(self, storage_with_bus_events): + """Test days filter excludes old events.""" + result = query_bus_events(storage_with_bus_events, days=7) + assert result["event_count"] == 4 # Old event excluded + + +class TestSchemaAndMigration: + """Tests for the raw_bus_events schema.""" + + def test_raw_bus_events_table_exists(self, storage): + """Test that raw_bus_events table is created.""" + rows = storage.execute_query( + "SELECT name FROM sqlite_master WHERE type='table' AND name='raw_bus_events'" + ) + assert len(rows) == 1 + + def test_bus_events_table_exists(self, storage): + """Test that bus_events table is created.""" + rows = storage.execute_query( + "SELECT name FROM sqlite_master WHERE type='table' AND name='bus_events'" + ) + assert len(rows) == 1