Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/agent_session_analytics/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,13 +578,19 @@ def ingest_file(
if state and not force:
# Skip if file hasn't changed
if state.file_size == file_size and state.last_modified >= file_mtime:
return {"entries_processed": 0, "events_added": 0, "skipped": True}
return {
"entries_processed": 0,
"events_added": 0,
"raw_entries_added": 0,
"skipped": True,
}

# Extract project path from directory name
project_path = file_path.parent.name

# Parse and collect events
# Parse and collect events + raw entries
events = []
raw_tuples = []
entries_processed = 0
errors = 0

Expand All @@ -599,15 +605,22 @@ def ingest_file(
parsed_events = parse_entry(raw, project_path)
events.extend(parsed_events)
entries_processed += 1

# Store raw entry for future re-parsing
session_id = raw.get("sessionId")
timestamp = raw.get("timestamp")
if session_id and timestamp:
raw_tuples.append((session_id, project_path, timestamp, line))
except json.JSONDecodeError as e:
logger.debug(f"JSON parse error in {file_path}:{line_num}: {e}")
errors += 1
except Exception as e:
logger.warning(f"Error processing {file_path}:{line_num}: {e}")
errors += 1

# Batch insert events
# Batch insert events and raw entries
events_added = storage.add_events_batch(events) if events else 0
raw_entries_added = storage.add_raw_entries_batch(raw_tuples) if raw_tuples else 0

# Update ingestion state
storage.update_ingestion_state(
Expand All @@ -623,6 +636,7 @@ def ingest_file(
return {
"entries_processed": entries_processed,
"events_added": events_added,
"raw_entries_added": raw_entries_added,
"skipped": False,
"errors": errors,
}
Expand Down Expand Up @@ -693,6 +707,7 @@ def ingest_logs(

total_entries = 0
total_events = 0
total_raw_entries = 0
files_processed = 0
files_skipped = 0
total_errors = 0
Expand All @@ -706,6 +721,7 @@ def ingest_logs(
files_processed += 1
total_entries += result["entries_processed"]
total_events += result["events_added"]
total_raw_entries += result.get("raw_entries_added", 0)
total_errors += result.get("errors", 0)
except Exception as e:
logger.error(f"Failed to ingest {file_path}: {e}")
Expand All @@ -720,6 +736,7 @@ def ingest_logs(
"files_skipped": files_skipped,
"entries_processed": total_entries,
"events_added": total_events,
"raw_entries_added": total_raw_entries,
"sessions_updated": sessions_updated,
"errors": total_errors,
}
Expand Down
35 changes: 35 additions & 0 deletions tests/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,42 @@ def test_ingest_file(self, storage, sample_logs_dir):
result = ingest_file(jsonl_file, storage)
assert result["entries_processed"] == 3
assert result["events_added"] == 4 # RFC #41: assistant creates 2 events now
assert result["raw_entries_added"] == 3
assert result["skipped"] is False

def test_ingest_file_stores_raw_entries(self, storage, sample_logs_dir):
"""Test that local ingestion stores raw entries for future re-parsing."""
project_dir = sample_logs_dir / "-test-project"
jsonl_file = project_dir / "test-session.jsonl"

ingest_file(jsonl_file, storage)

# Verify raw entries are in the DB
rows = storage.execute_query("SELECT COUNT(*) as count FROM raw_entries")
assert rows[0]["count"] == 3

# Verify raw entries contain valid JSON
rows = storage.execute_query(
"SELECT session_id, project_path, entry_json FROM raw_entries LIMIT 1"
)
assert rows[0]["session_id"] is not None
assert rows[0]["project_path"] == "-test-project"
parsed = json.loads(rows[0]["entry_json"])
assert "sessionId" in parsed
assert "timestamp" in parsed

def test_ingest_file_raw_entries_dedup(self, storage, sample_logs_dir):
"""Test that re-ingestion doesn't duplicate raw entries."""
project_dir = sample_logs_dir / "-test-project"
jsonl_file = project_dir / "test-session.jsonl"

ingest_file(jsonl_file, storage)
ingest_file(jsonl_file, storage, force=True)

# Should still be 3 raw entries (INSERT OR IGNORE deduplicates)
rows = storage.execute_query("SELECT COUNT(*) as count FROM raw_entries")
assert rows[0]["count"] == 3

def test_incremental_ingestion(self, storage, sample_logs_dir):
"""Test that unchanged files are skipped on re-ingestion."""
project_dir = sample_logs_dir / "-test-project"
Expand Down Expand Up @@ -602,6 +636,7 @@ def test_ingest_logs(self, storage, sample_logs_dir):
# Ingest the file
result = do_ingest_file(files[0], storage)
assert result["events_added"] == 4 # RFC #41: assistant creates 2 events
assert result["raw_entries_added"] == 3

# Update session stats
sessions = update_session_stats(storage)
Expand Down
1 change: 1 addition & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_ingest_logs():
assert result["status"] == "ok"
assert "files_found" in result
assert "events_added" in result
assert "raw_entries_added" in result


def test_get_tool_frequency():
Expand Down