diff --git a/src/copilot_usage/parser.py b/src/copilot_usage/parser.py index 7f5d2dbf..7604df2c 100644 --- a/src/copilot_usage/parser.py +++ b/src/copilot_usage/parser.py @@ -95,9 +95,17 @@ def _insert_session_entry( @dataclasses.dataclass(frozen=True, slots=True) class _CachedEvents: - """Cache entry pairing a file identity with parsed events.""" + """Cache entry pairing a file identity with parsed events. + + ``end_offset`` is the byte position after the last fully consumed + line. When the file grows (append-only), only bytes after + ``end_offset`` need to be parsed — avoiding a full re-read, even if + some fully consumed lines were skipped due to parse or validation + errors. + """ file_id: tuple[int, int] | None + end_offset: int events: tuple[SessionEvent, ...] @@ -112,7 +120,8 @@ class _CachedEvents: def _insert_events_entry( events_path: Path, file_id: tuple[int, int] | None, - events: list[SessionEvent], + events: list[SessionEvent] | tuple[SessionEvent, ...], + end_offset: int = 0, ) -> None: """Insert parsed events into ``_EVENTS_CACHE`` with LRU eviction. @@ -120,7 +129,7 @@ def _insert_events_entry( old entry is removed first. Otherwise, when the cache is full the least-recently-used entry (front of the ``OrderedDict``) is evicted. - The *events* list is converted to a ``tuple`` before storage so + The *events* sequence is converted to a ``tuple`` before storage so that callers cannot accidentally add, remove, or reorder entries in the cache. This is **container-level** immutability only — individual ``SessionEvent`` objects remain mutable and must not @@ -130,17 +139,97 @@ def _insert_events_entry( del _EVENTS_CACHE[events_path] elif len(_EVENTS_CACHE) >= _MAX_CACHED_EVENTS: _EVENTS_CACHE.popitem(last=False) # evict LRU (front) - _EVENTS_CACHE[events_path] = _CachedEvents(file_id=file_id, events=tuple(events)) + stored = events if isinstance(events, tuple) else tuple(events) + _EVENTS_CACHE[events_path] = _CachedEvents( + file_id=file_id, end_offset=end_offset, events=stored + ) + + +def _parse_events_from_offset( + events_path: Path, offset: int +) -> tuple[list[SessionEvent], int]: + """Parse events from *events_path* starting at byte *offset*. + + Only lines beginning at or after *offset* are JSON-decoded and + Pydantic-validated. Complete malformed or invalid lines are skipped + with a warning, matching the behaviour of :func:`parse_events`. + + Lines without a trailing newline (possible incomplete write) that + fail JSON decoding are treated as still-in-progress and stop + parsing — the returned *safe_end* does not advance past them so + the caller can retry on the next refresh. + + Returns: + ``(new_events, safe_end)`` where *safe_end* is the byte + position after the last fully consumed line. Callers should + store this as ``end_offset`` so incomplete trailing lines are + retried on the next refresh. + + Raises: + OSError: If the file cannot be opened or read. + """ + new_events: list[SessionEvent] = [] + safe_offset = offset + try: + with events_path.open("rb") as fh: + fh.seek(offset) + current_offset = offset + for raw_line in fh: + line_start = current_offset + current_offset += len(raw_line) + stripped = raw_line.strip() + if not stripped: + safe_offset = current_offset + continue + try: + raw = json.loads(stripped) + except json.JSONDecodeError: + if not raw_line.endswith(b"\n"): + # Possibly incomplete write — stop and retry later + break + logger.warning( + "{}:offset {} — malformed JSON, skipping", + events_path, + line_start, + ) + safe_offset = current_offset + continue + try: + new_events.append(SessionEvent.model_validate(raw)) + except ValidationError as exc: + logger.warning( + "{}:offset {} — validation error ({}), skipping", + events_path, + line_start, + exc.error_count(), + ) + safe_offset = current_offset + except UnicodeDecodeError as exc: + logger.warning( + "{} — UTF-8 decode error at offset {}; returning {} new events (partial): {}", + events_path, + safe_offset, + len(new_events), + exc, + ) + return new_events, safe_offset def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]: """Return parsed events, using cache when file identity is unchanged. - Delegates to :func:`parse_events` on a cache miss and stores the - result keyed by *events_path* with file-identity validation on - lookup. The cache is bounded to :data:`_MAX_CACHED_EVENTS` - entries; the **least-recently used** entry is evicted when the - limit is reached. + On a cache miss (cold start, truncation, or unknown file) the entire + file is re-read via :func:`_parse_events_from_offset` at offset 0 + and the result is stored keyed by *events_path* with file-identity + validation on lookup. The cache is bounded to + :data:`_MAX_CACHED_EVENTS` entries; the **least-recently used** entry + is evicted when the limit is reached. + + When the file has grown since the last read (append-only pattern), + only the newly appended bytes are parsed via + :func:`_parse_events_from_offset` and merged with the cached tuple. + A full reparse is performed when the file has shrunk (truncation or + replacement) or on cold start. The returned ``tuple`` prevents callers from adding, removing, or reordering cached entries (container-level immutability). Individual @@ -149,16 +238,51 @@ def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]: ``list(get_cached_events(...))``. Raises: - OSError: Propagated from :func:`parse_events` when the file - cannot be opened or read. + OSError: Propagated from :func:`_parse_events_from_offset` + when the file cannot be opened or read. """ file_id = _safe_file_identity(events_path) cached = _EVENTS_CACHE.get(events_path) - if cached is not None and cached.file_id == file_id: - _EVENTS_CACHE.move_to_end(events_path) - return cached.events - events = parse_events(events_path) - _insert_events_entry(events_path, file_id, events) + + if cached is not None and file_id is not None: + new_size = file_id[1] + if cached.file_id == file_id: + _EVENTS_CACHE.move_to_end(events_path) + return cached.events + # Append-only growth: new size ≥ cached end_offset → incremental + if new_size >= cached.end_offset: + new_events, safe_end = _parse_events_from_offset( + events_path, cached.end_offset + ) + merged = cached.events + tuple(new_events) + # Never claim the cache covers more bytes than actually + # consumed — clamp size to safe_end when they diverge so + # the next call takes the incremental path for any + # trailing unconsumed bytes. + inc_id: tuple[int, int] | None = file_id + if file_id[1] != safe_end: + inc_id = (file_id[0], safe_end) + _insert_events_entry(events_path, inc_id, merged, safe_end) + return _EVENTS_CACHE[events_path].events + + # Full reparse: cold start, truncation, or unknown file. + # Use _parse_events_from_offset(offset=0) instead of parse_events() so + # we get a safe_end byte boundary that reflects only bytes actually + # consumed. A post-parse stat() could observe bytes appended after + # parsing completed, overstating the consumed boundary and causing + # later incremental refreshes to skip unparsed data. + events, safe_end = _parse_events_from_offset(events_path, 0) + # Re-stat so the cached file_id reflects the current mtime for + # exact-match lookups, but never claim the cache covers more bytes + # than were actually consumed up to safe_end. + post_id = _safe_file_identity(events_path) + if post_id is None: + stored_id = file_id + elif post_id[1] == safe_end: + stored_id = post_id + else: + stored_id = (post_id[0], safe_end) + _insert_events_entry(events_path, stored_id, events, safe_end) return _EVENTS_CACHE[events_path].events @@ -783,7 +907,9 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]: # Only the newest _MAX_CACHED_EVENTS entries are retained for # _EVENTS_CACHE to avoid a temporary memory spike when many sessions # are cache-misses. - deferred_events: list[tuple[Path, tuple[int, int] | None, list[SessionEvent]]] = [] + deferred_events: list[ + tuple[Path, tuple[int, int] | None, list[SessionEvent], int] + ] = [] deferred_sessions: list[tuple[Path, _CachedSession]] = [] cache_hit_paths: list[Path] = [] for events_path, file_id, plan_id in discovered: @@ -818,15 +944,32 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]: cache_hit_paths.append(events_path) summaries.append(summary) continue + # Reuse _EVENTS_CACHE for incremental append-only parsing so + # that the session-summary rebuild only validates newly appended + # events instead of re-parsing the entire file from offset 0. + events_cached = _EVENTS_CACHE.get(events_path) + start_offset = 0 + prior_events: tuple[SessionEvent, ...] = () + if ( + events_cached is not None + and file_id is not None + and file_id[1] >= events_cached.end_offset + and events_cached.end_offset > 0 + ): + start_offset = events_cached.end_offset + prior_events = events_cached.events try: - events = parse_events(events_path) + new_events, safe_end = _parse_events_from_offset(events_path, start_offset) except OSError as exc: logger.warning("Skipping unreadable session {}: {}", events_path, exc) continue + events: list[SessionEvent] = ( + list(prior_events) + new_events if prior_events else new_events + ) if not events: continue if len(deferred_events) < _MAX_CACHED_EVENTS: - deferred_events.append((events_path, file_id, events)) + deferred_events.append((events_path, file_id, events, safe_end)) meta = _build_session_summary_with_meta( events, session_dir=events_path.parent, @@ -861,8 +1004,11 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]: # Populate _EVENTS_CACHE in oldest→newest order so that the newest # sessions sit at the back (MRU) and eviction drops the oldest. - for ep, fid, evts in reversed(deferred_events): - _insert_events_entry(ep, fid, evts) + for ep, fid, evts, safe_end in reversed(deferred_events): + # Use the safe_end byte boundary returned by the parser rather + # than a post-parse stat() size, which can overstate the bytes + # actually consumed if the file grew after parsing completed. + _insert_events_entry(ep, fid, evts, safe_end) # Prune stale cache entries for sessions no longer on disk. discovered_paths = {p for p, _, _ in discovered} diff --git a/tests/copilot_usage/test_parser.py b/tests/copilot_usage/test_parser.py index b34c9303..d259fa4b 100644 --- a/tests/copilot_usage/test_parser.py +++ b/tests/copilot_usage/test_parser.py @@ -47,6 +47,7 @@ _FirstPassResult, _infer_model_from_metrics, _insert_session_entry, + _parse_events_from_offset, _read_config_model, _ResumeInfo, _safe_file_identity, @@ -5023,7 +5024,10 @@ def test_unchanged_file_not_reparsed(self, tmp_path: Path) -> None: self._make_session(tmp_path, "sess-b", "b") # First call — populates cache; both files must be parsed. - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result1 = get_all_sessions(tmp_path) assert len(result1) == 2 assert spy.call_count == 2 @@ -5053,12 +5057,19 @@ def test_unchanged_file_not_reparsed(self, tmp_path: Path) -> None: stat = p1.stat() os.utime(p1, ns=(stat.st_atime_ns, stat.st_mtime_ns + 2_000_000_000)) - # Second call — only the modified file should be re-parsed. - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + # Second call — only the modified file should be re-parsed, + # using the incremental path (non-zero offset from _EVENTS_CACHE). + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result2 = get_all_sessions(tmp_path) assert len(result2) == 2 assert spy.call_count == 1 - spy.assert_called_once_with(p1) + call_args = spy.call_args + assert call_args is not None + assert call_args[0][0] == p1 # correct path + assert call_args[0][1] > 0 # incremental, not full reparse def test_cache_returns_correct_summaries(self, tmp_path: Path) -> None: """Cached entries produce the same summaries as a fresh parse.""" @@ -5093,7 +5104,7 @@ def test_cache_refreshes_session_name_on_plan_rename(self, tmp_path: Path) -> No assert cached_summary.name == "Renamed Session" def test_single_stat_per_file(self, tmp_path: Path) -> None: - """events.jsonl stat'd once (discovery), plan.md stat'd once (cache store).""" + """events.jsonl stat'd once (discovery), plan.md stat'd once.""" self._make_session(tmp_path, "sess-a", "a") with patch( @@ -5101,7 +5112,9 @@ def test_single_stat_per_file(self, tmp_path: Path) -> None: ) as spy: get_all_sessions(tmp_path) # _safe_file_identity called once by _discover_with_identity for - # events.jsonl, and once for plan.md when storing the cache entry. + # events.jsonl and once for plan.md when storing the cache entry. + # No post-parse stat is needed because _parse_events_from_offset + # returns a safe_end byte boundary directly. assert spy.call_count == 2 def test_resumed_session_is_cached(self, tmp_path: Path) -> None: @@ -5330,7 +5343,10 @@ def test_active_session_parse_events_called_once(self, tmp_path: Path) -> None: # First call — must parse with ( patch("copilot_usage.parser._CONFIG_PATH", config), - patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy, + patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy, ): result1 = get_all_sessions(tmp_path) assert len(result1) == 1 @@ -5340,7 +5356,10 @@ def test_active_session_parse_events_called_once(self, tmp_path: Path) -> None: # Second call — no file changes, should use cache with ( patch("copilot_usage.parser._CONFIG_PATH", config), - patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy, + patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy, ): result2 = get_all_sessions(tmp_path) assert len(result2) == 1 @@ -5367,7 +5386,10 @@ def test_active_session_cache_invalidated_on_config_change( config.write_text('{"model": "claude-sonnet-4"}', encoding="utf-8") # Second call should re-parse because config model changed - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result2 = get_all_sessions(tmp_path / "sessions") assert len(result2) == 1 assert result2[0].model == "claude-sonnet-4" @@ -5388,7 +5410,10 @@ def test_active_session_cache_hit_on_unchanged_config( get_all_sessions(tmp_path / "sessions") # Second call — same config, same file → cache hit - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result = get_all_sessions(tmp_path / "sessions") assert len(result) == 1 assert result[0].model == "gpt-5.1" @@ -5431,7 +5456,10 @@ def test_active_session_config_none_to_real_invalidates( # Now create a config with a model config.write_text('{"model": "gpt-5.1"}', encoding="utf-8") - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result2 = get_all_sessions(tmp_path / "sessions") assert len(result2) == 1 assert result2[0].model == "gpt-5.1" @@ -5460,7 +5488,10 @@ def test_active_session_with_event_model_not_invalidated_by_config( # Change config — should NOT trigger re-parse config.write_text('{"model": "gpt-5.2"}', encoding="utf-8") - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result2 = get_all_sessions(tmp_path / "sessions") assert len(result2) == 1 assert result2[0].model == "claude-sonnet-4" @@ -5489,7 +5520,10 @@ def test_active_session_real_model_to_none_invalidates( # Delete the config file — config_model should now be None config.unlink() - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: result2 = get_all_sessions(tmp_path / "sessions") assert len(result2) == 1 assert result2[0].model is None # config-sourced model gone @@ -5664,6 +5698,267 @@ def test_oserror_propagated_on_missing_file(self, tmp_path: Path) -> None: get_cached_events(missing) +# --------------------------------------------------------------------------- +# Issue #732 — incremental append-only parsing in get_cached_events +# --------------------------------------------------------------------------- + + +class TestIncrementalEventsParsing: + """Verify that get_cached_events incrementally parses only newly + appended events instead of re-reading the entire file. + """ + + def test_incremental_parse_only_validates_new_events(self, tmp_path: Path) -> None: + """Appending 10 events to a 5 000-event file triggers Pydantic + validation only for the new events, not the full file. + + Patches ``SessionEvent.model_validate`` with a counter to confirm + the incremental path was taken. + """ + p = tmp_path / "s1" / "events.jsonl" + initial_count = 5_000 + _write_large_events_file(p, initial_count) + + # Prime the cache with a cold read + first = get_cached_events(p) + assert len(first) == initial_count + 1 # 1 start + 5000 user messages + + # Append 10 new events + append_count = 10 + with p.open("a", encoding="utf-8") as fh: + for i in range(initial_count, initial_count + append_count): + fh.write(_make_user_event(i) + "\n") + + # Patch model_validate to count calls during incremental parse + original_validate = SessionEvent.model_validate + validate_calls: list[int] = [0] + + def counting_validate( + obj: object, + *args: object, + **kwargs: object, + ) -> SessionEvent: + validate_calls[0] += 1 + return original_validate(obj, *args, **kwargs) # type: ignore[arg-type] + + with patch.object( + SessionEvent, "model_validate", side_effect=counting_validate + ): + second = get_cached_events(p) + + # Only the 10 new events should have been validated + assert validate_calls[0] == append_count + # Total should include all events + assert len(second) == initial_count + 1 + append_count + + def test_incremental_parse_returns_all_events(self, tmp_path: Path) -> None: + """After incremental parse, the returned tuple contains every event.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG) + + first = get_cached_events(p) + assert len(first) == 2 + + with p.open("a", encoding="utf-8") as fh: + fh.write(_ASSISTANT_MSG + "\n") + + second = get_cached_events(p) + assert len(second) == 3 + # Original events are preserved + assert second[0].type == "session.start" + assert second[1].type == "user.message" + assert second[2].type == "assistant.message" + + def test_truncated_file_triggers_full_reparse(self, tmp_path: Path) -> None: + """If the file shrinks, the cache falls back to a full reparse.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG, _ASSISTANT_MSG) + + first = get_cached_events(p) + assert len(first) == 3 + + # Overwrite with a shorter file (simulates truncation) + _write_events(p, _START_EVENT) + + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: + second = get_cached_events(p) + assert spy.call_count == 1 # full reparse + spy.assert_called_once_with(p, 0) + assert len(second) == 1 + + def test_incremental_does_not_call_parse_events(self, tmp_path: Path) -> None: + """The incremental path reparses only from the cached non-zero offset.""" + p = tmp_path / "s1" / "events.jsonl" + _write_large_events_file(p, 100) + + get_cached_events(p) # prime + initial_size = p.stat().st_size + + with p.open("a", encoding="utf-8") as fh: + fh.write(_make_user_event(999) + "\n") + + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: + result = get_cached_events(p) + + assert spy.call_count == 1 + assert spy.call_args is not None + assert spy.call_args.args[0] == p + assert spy.call_args.args[1] == initial_size + assert spy.call_args.args[1] > 0 + assert len(result) == 102 # 1 start + 100 + 1 appended + + def test_cache_entry_stores_end_offset(self, tmp_path: Path) -> None: + """After a call, _EVENTS_CACHE entry has end_offset == file size.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG) + expected_size = p.stat().st_size + + get_cached_events(p) + + entry = _EVENTS_CACHE[p] + assert entry.end_offset == expected_size + + def test_incremental_updates_end_offset(self, tmp_path: Path) -> None: + """After incremental parse, end_offset reflects the new file size.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG) + get_cached_events(p) + + with p.open("a", encoding="utf-8") as fh: + fh.write(_ASSISTANT_MSG + "\n") + new_size = p.stat().st_size + + get_cached_events(p) + + entry = _EVENTS_CACHE[p] + assert entry.end_offset == new_size + + def test_cold_start_full_reparse(self, tmp_path: Path) -> None: + """A cold cache (no prior entry) always does a full reparse.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG, _ASSISTANT_MSG) + + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: + result = get_cached_events(p) + spy.assert_called_once_with(p, 0) + assert len(result) == 3 + + def test_incomplete_event_retried_on_next_refresh(self, tmp_path: Path) -> None: + """When a line is incomplete at EOF, the next refresh retries it.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG) + + get_cached_events(p) # prime cache + + # Write first part of assistant message (incomplete — no newline) + partial = _ASSISTANT_MSG.encode("utf-8")[:20] + with p.open("ab") as fh: + fh.write(partial) + + mid = get_cached_events(p) + assert len(mid) == 2 # incomplete event not parsed + + # Writer completes the line + rest = _ASSISTANT_MSG.encode("utf-8")[20:] + b"\n" + with p.open("ab") as fh: + fh.write(rest) + + final = get_cached_events(p) + assert len(final) == 3 + assert final[2].type == "assistant.message" + + +class TestParseEventsFromOffset: + """Exercise error-handling paths in _parse_events_from_offset.""" + + def test_blank_lines_are_skipped(self, tmp_path: Path) -> None: + """Empty / whitespace-only lines after the offset are ignored.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT, _USER_MSG) + offset = p.stat().st_size + + # Append a blank line followed by a valid event + with p.open("a", encoding="utf-8") as fh: + fh.write("\n") + fh.write(" \n") + fh.write(_ASSISTANT_MSG + "\n") + + result, end_off = _parse_events_from_offset(p, offset) + assert len(result) == 1 + assert result[0].type == "assistant.message" + assert end_off == p.stat().st_size + + def test_malformed_json_skipped(self, tmp_path: Path) -> None: + """Complete lines that are not valid JSON are skipped with a warning.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT) + offset = p.stat().st_size + + with p.open("a", encoding="utf-8") as fh: + fh.write("{not valid json\n") + fh.write(_USER_MSG + "\n") + + result, end_off = _parse_events_from_offset(p, offset) + assert len(result) == 1 + assert result[0].type == "user.message" + assert end_off == p.stat().st_size + + def test_validation_error_skipped(self, tmp_path: Path) -> None: + """JSON that is valid but fails Pydantic validation is skipped.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT) + offset = p.stat().st_size + + # Valid JSON but 'type' is an int — triggers ValidationError + with p.open("a", encoding="utf-8") as fh: + fh.write(json.dumps({"type": 123}) + "\n") + fh.write(_USER_MSG + "\n") + + result, end_off = _parse_events_from_offset(p, offset) + # The valid user message should be parsed; the invalid one skipped + assert len(result) == 1 + assert result[0].type == "user.message" + assert end_off == p.stat().st_size + + def test_unicode_decode_error_returns_partial(self, tmp_path: Path) -> None: + """A mid-stream UTF-8 decode error returns events parsed so far.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT) + offset = p.stat().st_size + + # Write a valid event followed by invalid UTF-8 bytes + with p.open("ab") as fh: + fh.write((_USER_MSG + "\n").encode("utf-8")) + fh.write(b"\xff\xfe invalid utf8\n") + + result, _end_off = _parse_events_from_offset(p, offset) + assert len(result) == 1 + assert result[0].type == "user.message" + + def test_incomplete_line_stops_parsing(self, tmp_path: Path) -> None: + """An unterminated line with invalid JSON stops incremental parsing.""" + p = tmp_path / "s1" / "events.jsonl" + _write_events(p, _START_EVENT) + offset = p.stat().st_size + + # Append a partial JSON line (no trailing newline — simulates mid-write) + with p.open("ab") as fh: + fh.write(b'{"type": "assistant') + + result, end_off = _parse_events_from_offset(p, offset) + assert len(result) == 0 + assert end_off == offset # did not advance past incomplete line + + # --------------------------------------------------------------------------- # Issue #668 — get_all_sessions populates _EVENTS_CACHE # --------------------------------------------------------------------------- @@ -5700,12 +5995,15 @@ def test_get_cached_events_after_get_all_sessions_no_reparse( — parse_events is called only once per session, not twice.""" p = self._make_session(tmp_path, "sess-a", "a") - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: get_all_sessions(tmp_path) assert spy.call_count == 1 # single parse during get_all_sessions events = get_cached_events(p) - # Still 1 — no additional parse_events call + # Still 1 — no additional _parse_events_from_offset call assert spy.call_count == 1 assert len(events) == 3 # start + user + shutdown @@ -5776,7 +6074,10 @@ def test_excluded_session_reparses_on_get_cached_events( excluded_path = paths[0] assert excluded_path not in _EVENTS_CACHE - with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy: + with patch( + "copilot_usage.parser._parse_events_from_offset", + wraps=_parse_events_from_offset, + ) as spy: events = get_cached_events(excluded_path) spy.assert_called_once() # cache miss → re-parse