From 6cab89edbd2543a1e5c19221192b4dc416240eb0 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Mon, 9 Feb 2026 02:48:21 +0000 Subject: [PATCH] fix: First bus event ingestion now captures full history (#108) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First run no longer applies a timestamp cutoff — it ingests ALL events from the event-bus database. Subsequent runs continue using the high-water mark for incremental updates. This prevents the scenario where a days=7 first run permanently skips older events. Closes #108 Co-Authored-By: Claude Opus 4.6 --- src/agent_session_analytics/bus_ingest.py | 17 +++++------------ tests/test_bus_ingest.py | 21 +++++++++++---------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/src/agent_session_analytics/bus_ingest.py b/src/agent_session_analytics/bus_ingest.py index bfa8ebb..a55a14e 100644 --- a/src/agent_session_analytics/bus_ingest.py +++ b/src/agent_session_analytics/bus_ingest.py @@ -8,7 +8,6 @@ import json import logging import sqlite3 -from datetime import datetime, timedelta from pathlib import Path from agent_session_analytics.storage import SQLiteStorage @@ -29,12 +28,12 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: """Ingest events from event-bus database. Performs incremental ingestion by tracking the last ingested event ID. - Events are read from the event-bus database in read-only mode. + First run ingests all events; subsequent runs only pick up new events. Raw event JSON is stored alongside parsed data for future re-parsing. Args: storage: Session analytics storage instance - days: Number of days to look back for initial ingestion + days: Unused (kept for backward compatibility) Returns: Dict with ingestion stats including events_ingested count @@ -50,9 +49,6 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: last_event = storage.execute_query("SELECT MAX(event_id) as last_id FROM bus_events") last_id = last_event[0]["last_id"] if last_event and last_event[0]["last_id"] else 0 - # Calculate cutoff for first-run ingestion - cutoff = datetime.now() - timedelta(days=days) - # Read from event-bus DB (read-only mode) try: conn = sqlite3.connect(f"file:{EVENT_BUS_DB}?mode=ro", uri=True) @@ -65,9 +61,8 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: } try: - # Query events newer than last ingested ID, or from cutoff on first run if last_id > 0: - # Incremental: get events after last ID + # Incremental: get events after last ingested ID rows = conn.execute( """ SELECT id, event_type, channel, session_id, timestamp, payload @@ -78,15 +73,13 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict: (last_id,), ).fetchall() else: - # First run: get events from cutoff + # First run: ingest ALL events (no timestamp cutoff) rows = conn.execute( """ SELECT id, event_type, channel, session_id, timestamp, payload FROM events - WHERE timestamp >= ? ORDER BY id - """, - (cutoff.isoformat(),), + """ ).fetchall() if not rows: diff --git a/tests/test_bus_ingest.py b/tests/test_bus_ingest.py index 588adb0..e12a5e0 100644 --- a/tests/test_bus_ingest.py +++ b/tests/test_bus_ingest.py @@ -106,10 +106,10 @@ class TestIngestBusEvents: def test_ingest_from_bus_db(self, storage, bus_db): """Test basic ingestion from an event-bus database.""" with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): - result = ingest_bus_events(storage, days=7) + result = ingest_bus_events(storage) assert result["status"] == "ok" - assert result["events_ingested"] == 4 # 4 within 7 days + assert result["events_ingested"] == 5 # First run gets ALL events def test_incremental_ingestion(self, storage, bus_db): """Test that second ingestion only picks up new events.""" @@ -160,11 +160,11 @@ def test_missing_db_skips(self, storage): def test_raw_events_stored(self, storage, bus_db): """Test that raw event JSON is stored in raw_bus_events table.""" with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): - ingest_bus_events(storage, days=7) + ingest_bus_events(storage) # Check raw_bus_events table rows = storage.execute_query("SELECT * FROM raw_bus_events ORDER BY event_id") - assert len(rows) == 4 # 4 within 7 days + assert len(rows) == 5 # First run gets ALL events # Verify raw JSON is parseable and contains original fields raw = json.loads(rows[0]["entry_json"]) @@ -187,19 +187,20 @@ def test_raw_events_dedup(self, storage, bus_db): def test_repo_extraction(self, storage, bus_db): """Test that repo is correctly extracted from channel.""" with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): - ingest_bus_events(storage, days=7) + ingest_bus_events(storage) rows = storage.execute_query( "SELECT repo FROM bus_events WHERE event_type = 'gotcha_discovered' ORDER BY event_id" ) assert rows[0]["repo"] == "dotfiles" - def test_full_history_ingestion(self, storage, bus_db): - """Test ingestion with large days window gets all events.""" + def test_first_run_ignores_days_param(self, storage, bus_db): + """Test that first run ingests ALL events regardless of days param.""" with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): - result = ingest_bus_events(storage, days=30) + # Even with days=1, first run should get all 5 events (including 10-day-old one) + result = ingest_bus_events(storage, days=1) - assert result["events_ingested"] == 5 # All events including old one + assert result["events_ingested"] == 5 class TestQueryBusEvents: @@ -209,7 +210,7 @@ class TestQueryBusEvents: def storage_with_bus_events(self, storage, bus_db): """Storage with bus events ingested.""" with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db): - ingest_bus_events(storage, days=30) + ingest_bus_events(storage) return storage def test_query_all(self, storage_with_bus_events):