diff --git a/src/agent_session_analytics/ingest.py b/src/agent_session_analytics/ingest.py index 4eb3df5..5ea55d4 100644 --- a/src/agent_session_analytics/ingest.py +++ b/src/agent_session_analytics/ingest.py @@ -578,13 +578,19 @@ def ingest_file( if state and not force: # Skip if file hasn't changed if state.file_size == file_size and state.last_modified >= file_mtime: - return {"entries_processed": 0, "events_added": 0, "skipped": True} + return { + "entries_processed": 0, + "events_added": 0, + "raw_entries_added": 0, + "skipped": True, + } # Extract project path from directory name project_path = file_path.parent.name - # Parse and collect events + # Parse and collect events + raw entries events = [] + raw_tuples = [] entries_processed = 0 errors = 0 @@ -599,6 +605,12 @@ def ingest_file( parsed_events = parse_entry(raw, project_path) events.extend(parsed_events) entries_processed += 1 + + # Store raw entry for future re-parsing + session_id = raw.get("sessionId") + timestamp = raw.get("timestamp") + if session_id and timestamp: + raw_tuples.append((session_id, project_path, timestamp, line)) except json.JSONDecodeError as e: logger.debug(f"JSON parse error in {file_path}:{line_num}: {e}") errors += 1 @@ -606,8 +618,9 @@ def ingest_file( logger.warning(f"Error processing {file_path}:{line_num}: {e}") errors += 1 - # Batch insert events + # Batch insert events and raw entries events_added = storage.add_events_batch(events) if events else 0 + raw_entries_added = storage.add_raw_entries_batch(raw_tuples) if raw_tuples else 0 # Update ingestion state storage.update_ingestion_state( @@ -623,6 +636,7 @@ def ingest_file( return { "entries_processed": entries_processed, "events_added": events_added, + "raw_entries_added": raw_entries_added, "skipped": False, "errors": errors, } @@ -693,6 +707,7 @@ def ingest_logs( total_entries = 0 total_events = 0 + total_raw_entries = 0 files_processed = 0 files_skipped = 0 total_errors = 0 @@ -706,6 +721,7 @@ def ingest_logs( files_processed += 1 total_entries += result["entries_processed"] total_events += result["events_added"] + total_raw_entries += result.get("raw_entries_added", 0) total_errors += result.get("errors", 0) except Exception as e: logger.error(f"Failed to ingest {file_path}: {e}") @@ -720,6 +736,7 @@ def ingest_logs( "files_skipped": files_skipped, "entries_processed": total_entries, "events_added": total_events, + "raw_entries_added": total_raw_entries, "sessions_updated": sessions_updated, "errors": total_errors, } diff --git a/tests/test_ingest.py b/tests/test_ingest.py index dfcdc7c..dc46fbc 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -530,8 +530,42 @@ def test_ingest_file(self, storage, sample_logs_dir): result = ingest_file(jsonl_file, storage) assert result["entries_processed"] == 3 assert result["events_added"] == 4 # RFC #41: assistant creates 2 events now + assert result["raw_entries_added"] == 3 assert result["skipped"] is False + def test_ingest_file_stores_raw_entries(self, storage, sample_logs_dir): + """Test that local ingestion stores raw entries for future re-parsing.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + ingest_file(jsonl_file, storage) + + # Verify raw entries are in the DB + rows = storage.execute_query("SELECT COUNT(*) as count FROM raw_entries") + assert rows[0]["count"] == 3 + + # Verify raw entries contain valid JSON + rows = storage.execute_query( + "SELECT session_id, project_path, entry_json FROM raw_entries LIMIT 1" + ) + assert rows[0]["session_id"] is not None + assert rows[0]["project_path"] == "-test-project" + parsed = json.loads(rows[0]["entry_json"]) + assert "sessionId" in parsed + assert "timestamp" in parsed + + def test_ingest_file_raw_entries_dedup(self, storage, sample_logs_dir): + """Test that re-ingestion doesn't duplicate raw entries.""" + project_dir = sample_logs_dir / "-test-project" + jsonl_file = project_dir / "test-session.jsonl" + + ingest_file(jsonl_file, storage) + ingest_file(jsonl_file, storage, force=True) + + # Should still be 3 raw entries (INSERT OR IGNORE deduplicates) + rows = storage.execute_query("SELECT COUNT(*) as count FROM raw_entries") + assert rows[0]["count"] == 3 + def test_incremental_ingestion(self, storage, sample_logs_dir): """Test that unchanged files are skipped on re-ingestion.""" project_dir = sample_logs_dir / "-test-project" @@ -602,6 +636,7 @@ def test_ingest_logs(self, storage, sample_logs_dir): # Ingest the file result = do_ingest_file(files[0], storage) assert result["events_added"] == 4 # RFC #41: assistant creates 2 events + assert result["raw_entries_added"] == 3 # Update session stats sessions = update_session_stats(storage) diff --git a/tests/test_server.py b/tests/test_server.py index cd391f4..d5dba4a 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -55,6 +55,7 @@ def test_ingest_logs(): assert result["status"] == "ok" assert "files_found" in result assert "events_added" in result + assert "raw_entries_added" in result def test_get_tool_frequency():