From 27183cc8f7cae85410fa81ef49207fd619350a17 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sat, 7 Feb 2026 04:08:47 +0000 Subject: [PATCH 1/4] fix: Store raw entries during local JSONL ingestion Local ingestion (ingest_logs/ingest_file) now stores raw JSONL entries in the raw_entries table, matching the behavior of upload_entries for client pushes. This ensures server-side session data can be re-parsed if the parser improves, closing the gap where only client-pushed data had raw entries preserved. Co-Authored-By: Claude Opus 4.6 --- src/agent_session_analytics/ingest.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/agent_session_analytics/ingest.py b/src/agent_session_analytics/ingest.py index 4eb3df5..b68c28f 100644 --- a/src/agent_session_analytics/ingest.py +++ b/src/agent_session_analytics/ingest.py @@ -583,8 +583,9 @@ def ingest_file( # Extract project path from directory name project_path = file_path.parent.name - # Parse and collect events + # Parse and collect events + raw entries events = [] + raw_tuples = [] entries_processed = 0 errors = 0 @@ -599,6 +600,12 @@ def ingest_file( parsed_events = parse_entry(raw, project_path) events.extend(parsed_events) entries_processed += 1 + + # Store raw entry for future re-parsing + session_id = raw.get("sessionId") + timestamp = raw.get("timestamp") + if session_id and timestamp: + raw_tuples.append((session_id, project_path, timestamp, line)) except json.JSONDecodeError as e: logger.debug(f"JSON parse error in {file_path}:{line_num}: {e}") errors += 1 @@ -606,8 +613,9 @@ def ingest_file( logger.warning(f"Error processing {file_path}:{line_num}: {e}") errors += 1 - # Batch insert events + # Batch insert events and raw entries events_added = storage.add_events_batch(events) if events else 0 + raw_entries_added = storage.add_raw_entries_batch(raw_tuples) if raw_tuples else 0 # Update ingestion state storage.update_ingestion_state( @@ -623,6 +631,7 @@ def ingest_file( return { "entries_processed": entries_processed, "events_added": events_added, + "raw_entries_added": raw_entries_added, "skipped": False, "errors": errors, } @@ -693,6 +702,7 @@ def ingest_logs( total_entries = 0 total_events = 0 + total_raw_entries = 0 files_processed = 0 files_skipped = 0 total_errors = 0 @@ -706,6 +716,7 @@ def ingest_logs( files_processed += 1 total_entries += result["entries_processed"] total_events += result["events_added"] + total_raw_entries += result.get("raw_entries_added", 0) total_errors += result.get("errors", 0) except Exception as e: logger.error(f"Failed to ingest {file_path}: {e}") @@ -720,6 +731,7 @@ def ingest_logs( "files_skipped": files_skipped, "entries_processed": total_entries, "events_added": total_events, + "raw_entries_added": total_raw_entries, "sessions_updated": sessions_updated, "errors": total_errors, } From 6b88aea5b69835d374c78be58b6611526cf6c994 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sat, 7 Feb 2026 04:11:57 +0000 Subject: [PATCH 2/4] test: Add coverage for raw entries in local ingestion - test_ingest_file: assert raw_entries_added count - test_ingest_file_stores_raw_entries: verify DB contents and valid JSON - test_ingest_file_raw_entries_dedup: verify INSERT OR IGNORE dedup - test_ingest_logs: assert raw_entries_added in return dict - test_server.test_ingest_logs: assert raw_entries_added key exists Co-Authored-By: Claude Opus 4.6 --- tests/test_ingest.py | 37 +++++++++++++++++++++++++++++++++++++ tests/test_server.py | 1 + 2 files changed, 38 insertions(+) diff --git a/tests/test_ingest.py b/tests/test_ingest.py index dfcdc7c..23dd8c4 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -530,8 +530,44 @@ def test_ingest_file(self, storage, sample_logs_dir): result = ingest_file(jsonl_file, storage) assert result["entries_processed"] == 3 assert result["events_added"] == 4 # RFC #41: assistant creates 2 events now + assert result["raw_entries_added"] == 3 assert result["skipped"] is False + def test_ingest_file_stores_raw_entries(self, storage, sample_logs_dir): + """Test that local ingestion stores raw entries for future re-parsing.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + ingest_file(jsonl_file, storage) + + # Verify raw entries are in the DB + rows = storage.execute_query("SELECT COUNT(*) as count FROM raw_entries") + assert rows[0]["count"] == 3 + + # Verify raw entries contain valid JSON + rows = storage.execute_query( + "SELECT session_id, project_path, entry_json FROM raw_entries LIMIT 1" + ) + assert rows[0]["session_id"] is not None + assert rows[0]["project_path"] == "-test-project" + import json + + parsed = json.loads(rows[0]["entry_json"]) + assert "sessionId" in parsed + assert "timestamp" in parsed + + def test_ingest_file_raw_entries_dedup(self, storage, sample_logs_dir): + """Test that re-ingestion doesn't duplicate raw entries.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + ingest_file(jsonl_file, storage) + ingest_file(jsonl_file, storage, force=True) + + # Should still be 3 raw entries (INSERT OR IGNORE deduplicates) + rows = storage.execute_query("SELECT COUNT(*) as count FROM raw_entries") + assert rows[0]["count"] == 3 + def test_incremental_ingestion(self, storage, sample_logs_dir): """Test that unchanged files are skipped on re-ingestion.""" project_dir = sample_logs_dir / "-test-project" @@ -602,6 +638,7 @@ def test_ingest_logs(self, storage, sample_logs_dir): # Ingest the file result = do_ingest_file(files[0], storage) assert result["events_added"] == 4 # RFC #41: assistant creates 2 events + assert result["raw_entries_added"] == 3 # Update session stats sessions = update_session_stats(storage) diff --git a/tests/test_server.py b/tests/test_server.py index cd391f4..d5dba4a 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -55,6 +55,7 @@ def test_ingest_logs(): assert result["status"] == "ok" assert "files_found" in result assert "events_added" in result + assert "raw_entries_added" in result def test_get_tool_frequency(): From 8d7a897d72e8db207b3c2d5ffb5855ca859010ed Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sat, 7 Feb 2026 04:23:01 +0000 Subject: [PATCH 3/4] fix: Address reviewer feedback on raw entries PR - Add raw_entries_added: 0 to skipped return dict for consistency - Remove redundant import json inside test function Co-Authored-By: Claude Opus 4.6 --- src/agent_session_analytics/ingest.py | 2 +- tests/test_ingest.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/agent_session_analytics/ingest.py b/src/agent_session_analytics/ingest.py index b68c28f..540286a 100644 --- a/src/agent_session_analytics/ingest.py +++ b/src/agent_session_analytics/ingest.py @@ -578,7 +578,7 @@ def ingest_file( if state and not force: # Skip if file hasn't changed if state.file_size == file_size and state.last_modified >= file_mtime: - return {"entries_processed": 0, "events_added": 0, "skipped": True} + return {"entries_processed": 0, "events_added": 0, "raw_entries_added": 0, "skipped": True} # Extract project path from directory name project_path = file_path.parent.name diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 23dd8c4..dc46fbc 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -550,8 +550,6 @@ def test_ingest_file_stores_raw_entries(self, storage, sample_logs_dir): ) assert rows[0]["session_id"] is not None assert rows[0]["project_path"] == "-test-project" - import json - parsed = json.loads(rows[0]["entry_json"]) assert "sessionId" in parsed assert "timestamp" in parsed From 9b0bb6352823fe2224ff2be074e2cab582c31f88 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sat, 7 Feb 2026 04:24:54 +0000 Subject: [PATCH 4/4] style: Format ingest.py Co-Authored-By: Claude Opus 4.6 --- src/agent_session_analytics/ingest.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/agent_session_analytics/ingest.py b/src/agent_session_analytics/ingest.py index 540286a..5ea55d4 100644 --- a/src/agent_session_analytics/ingest.py +++ b/src/agent_session_analytics/ingest.py @@ -578,7 +578,12 @@ def ingest_file( if state and not force: # Skip if file hasn't changed if state.file_size == file_size and state.last_modified >= file_mtime: - return {"entries_processed": 0, "events_added": 0, "raw_entries_added": 0, "skipped": True} + return { + "entries_processed": 0, + "events_added": 0, + "raw_entries_added": 0, + "skipped": True, + } # Extract project path from directory name project_path = file_path.parent.name