Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions src/agent_session_analytics/bus_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import json
import logging
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path

from agent_session_analytics.storage import SQLiteStorage
Expand All @@ -29,12 +28,12 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
"""Ingest events from event-bus database.

Performs incremental ingestion by tracking the last ingested event ID.
Events are read from the event-bus database in read-only mode.
First run ingests all events; subsequent runs only pick up new events.
Raw event JSON is stored alongside parsed data for future re-parsing.

Args:
storage: Session analytics storage instance
days: Number of days to look back for initial ingestion
days: Unused (kept for backward compatibility)

Returns:
Dict with ingestion stats including events_ingested count
Expand All @@ -50,9 +49,6 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
last_event = storage.execute_query("SELECT MAX(event_id) as last_id FROM bus_events")
last_id = last_event[0]["last_id"] if last_event and last_event[0]["last_id"] else 0

# Calculate cutoff for first-run ingestion
cutoff = datetime.now() - timedelta(days=days)

# Read from event-bus DB (read-only mode)
try:
conn = sqlite3.connect(f"file:{EVENT_BUS_DB}?mode=ro", uri=True)
Expand All @@ -65,9 +61,8 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
}

try:
# Query events newer than last ingested ID, or from cutoff on first run
if last_id > 0:
# Incremental: get events after last ID
# Incremental: get events after last ingested ID
rows = conn.execute(
"""
SELECT id, event_type, channel, session_id, timestamp, payload
Expand All @@ -78,15 +73,13 @@ def ingest_bus_events(storage: SQLiteStorage, days: int = 7) -> dict:
(last_id,),
).fetchall()
else:
# First run: get events from cutoff
# First run: ingest ALL events (no timestamp cutoff)
rows = conn.execute(
"""
SELECT id, event_type, channel, session_id, timestamp, payload
FROM events
WHERE timestamp >= ?
ORDER BY id
""",
(cutoff.isoformat(),),
"""
).fetchall()

if not rows:
Expand Down
21 changes: 11 additions & 10 deletions tests/test_bus_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ class TestIngestBusEvents:
def test_ingest_from_bus_db(self, storage, bus_db):
"""Test basic ingestion from an event-bus database."""
with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db):
result = ingest_bus_events(storage, days=7)
result = ingest_bus_events(storage)

assert result["status"] == "ok"
assert result["events_ingested"] == 4 # 4 within 7 days
assert result["events_ingested"] == 5 # First run gets ALL events

def test_incremental_ingestion(self, storage, bus_db):
"""Test that second ingestion only picks up new events."""
Expand Down Expand Up @@ -160,11 +160,11 @@ def test_missing_db_skips(self, storage):
def test_raw_events_stored(self, storage, bus_db):
"""Test that raw event JSON is stored in raw_bus_events table."""
with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db):
ingest_bus_events(storage, days=7)
ingest_bus_events(storage)

# Check raw_bus_events table
rows = storage.execute_query("SELECT * FROM raw_bus_events ORDER BY event_id")
assert len(rows) == 4 # 4 within 7 days
assert len(rows) == 5 # First run gets ALL events

# Verify raw JSON is parseable and contains original fields
raw = json.loads(rows[0]["entry_json"])
Expand All @@ -187,19 +187,20 @@ def test_raw_events_dedup(self, storage, bus_db):
def test_repo_extraction(self, storage, bus_db):
"""Test that repo is correctly extracted from channel."""
with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db):
ingest_bus_events(storage, days=7)
ingest_bus_events(storage)

rows = storage.execute_query(
"SELECT repo FROM bus_events WHERE event_type = 'gotcha_discovered' ORDER BY event_id"
)
assert rows[0]["repo"] == "dotfiles"

def test_full_history_ingestion(self, storage, bus_db):
"""Test ingestion with large days window gets all events."""
def test_first_run_ignores_days_param(self, storage, bus_db):
"""Test that first run ingests ALL events regardless of days param."""
with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db):
result = ingest_bus_events(storage, days=30)
# Even with days=1, first run should get all 5 events (including 10-day-old one)
result = ingest_bus_events(storage, days=1)

assert result["events_ingested"] == 5 # All events including old one
assert result["events_ingested"] == 5


class TestQueryBusEvents:
Expand All @@ -209,7 +210,7 @@ class TestQueryBusEvents:
def storage_with_bus_events(self, storage, bus_db):
"""Storage with bus events ingested."""
with patch("agent_session_analytics.bus_ingest.EVENT_BUS_DB", bus_db):
ingest_bus_events(storage, days=30)
ingest_bus_events(storage)
return storage

def test_query_all(self, storage_with_bus_events):
Expand Down