diff --git a/src/copilot_usage/vscode_parser.py b/src/copilot_usage/vscode_parser.py index c2e22adf..4103a836 100644 --- a/src/copilot_usage/vscode_parser.py +++ b/src/copilot_usage/vscode_parser.py @@ -128,16 +128,64 @@ def parse_vscode_log(log_path: Path) -> list[VSCodeRequest]: """Parse a single VS Code Copilot Chat log file into request objects. Returns a list of parsed requests (possibly empty when no lines match). + Unlike incremental parsing via :func:`_parse_vscode_log_from_offset`, + this performs a complete one-shot read and includes the final line even + when it is not newline-terminated. + + Raises: + OSError: If the file cannot be opened or read. + """ + requests, _ = _parse_vscode_log_from_offset(log_path, 0, include_partial_tail=True) + return requests + + +def _parse_vscode_log_from_offset( + log_path: Path, + offset: int, + *, + include_partial_tail: bool = False, +) -> tuple[list[VSCodeRequest], int]: + """Parse VS Code Copilot Chat log starting at *offset* bytes. + + Returns ``(requests, end_offset)`` where *end_offset* is the byte + position immediately after the last line consumed by this call. + With the default ``include_partial_tail=False``, this is the end of + the last **complete** (newline-terminated) line read; a partial line + at EOF is intentionally excluded so that the next incremental call + can re-read it once the writer finishes the line. + + When *include_partial_tail* is ``True`` (used by :func:`parse_vscode_log` + for one-shot full parsing), a final non-newline-terminated line is + **included** in the results, and ``end_offset`` advances past that + consumed partial tail as well to preserve full-file text semantics. Raises: OSError: If the file cannot be opened or read. """ requests: list[VSCodeRequest] = [] - with log_path.open(encoding="utf-8", errors="replace") as f: - for line in f: + safe_end: int = offset + with log_path.open("rb") as fb: + if offset > 0: + # Guard against TOCTOU race: the file may have been + # truncated/replaced between the caller's stat() and this + # open(). Re-validate with fstat on the open descriptor. + actual_size = os.fstat(fb.fileno()).st_size + if actual_size < offset: + offset = 0 + safe_end = 0 + fb.seek(offset) + for raw_line in fb: + is_complete = raw_line.endswith(b"\n") + if not is_complete and not include_partial_tail: + # Partial line at EOF — stop advancing so the next + # incremental call re-reads this line once complete. + break + safe_end += len(raw_line) # Fast pre-filter: only ~1–5% of lines contain "ccreq:" - if "ccreq:" not in line: + if b"ccreq:" not in raw_line: continue + # Decode with replacement to mirror parse_vscode_log behaviour. + line = raw_line.decode("utf-8", errors="replace") m = _CCREQ_RE.match(line) if m is None: continue @@ -155,53 +203,100 @@ def parse_vscode_log(log_path: Path) -> list[VSCodeRequest]: category=category, ) ) - logger.debug("Parsed {} request(s) from {}", len(requests), log_path) - return requests + logger.debug( + "Parsed {} request(s) from {} (offset {}→{})", + len(requests), + log_path, + offset, + safe_end, + ) + return requests, safe_end # --------------------------------------------------------------------------- # Module-level parsed-requests cache (mirrors parser._EVENTS_CACHE). # Uses OrderedDict for LRU eviction: most-recently-used entries are at # the back, least-recently-used at the front. +# +# Cache value layout: (file_id, end_offset, requests_tuple) +# file_id – (st_mtime_ns, st_size) or None when stat() fails +# end_offset – byte position of the last parsed line +# requests – immutable tuple of parsed VSCodeRequest objects # --------------------------------------------------------------------------- _MAX_CACHED_VSCODE_LOGS: Final[int] = 64 _VSCODE_LOG_CACHE: OrderedDict[ - Path, tuple[tuple[int, int] | None, tuple[VSCodeRequest, ...]] + Path, tuple[tuple[int, int] | None, int, tuple[VSCodeRequest, ...]] ] = OrderedDict() +def _update_vscode_cache( + log_path: Path, + file_id: tuple[int, int] | None, + end_offset: int, + requests: tuple[VSCodeRequest, ...], +) -> None: + """Insert or replace a cache entry with LRU eviction.""" + if log_path in _VSCODE_LOG_CACHE: + del _VSCODE_LOG_CACHE[log_path] + elif len(_VSCODE_LOG_CACHE) >= _MAX_CACHED_VSCODE_LOGS: + _VSCODE_LOG_CACHE.popitem(last=False) # evict LRU (front) + _VSCODE_LOG_CACHE[log_path] = (file_id, end_offset, requests) + + def _get_cached_vscode_requests(log_path: Path) -> tuple[VSCodeRequest, ...]: - """Return parsed requests, re-parsing only when ``(mtime_ns, size)`` changes. + """Return parsed requests, incrementally parsing only new content. On the first call for a given *log_path*, delegates to - :func:`parse_vscode_log` and stores the result. Subsequent calls - return the cached tuple as long as the file identity is unchanged. + :func:`_parse_vscode_log_from_offset` (offset 0) and stores the + result together with the byte offset reached. Subsequent calls + detect whether the file has **grown** (append-only) by comparing + the new ``st_size`` against the cached size — if so, only the + bytes after the stored offset are parsed and appended to the + existing result. + + When the file is **replaced** (new size < cached size) or + ``st_size`` cannot be determined, a full re-parse is performed. + The cache is bounded to :data:`_MAX_CACHED_VSCODE_LOGS` entries; the **least-recently used** entry is evicted when the limit is reached. - The parsed list is converted to a ``tuple`` before storage so that - callers cannot accidentally append, pop, or reorder entries in the - cache — matching the container-level immutability pattern used by - :func:`copilot_usage.parser.get_cached_events`. - Raises: - OSError: Propagated from :func:`parse_vscode_log` when the file - cannot be opened or read. + OSError: Propagated from :func:`_parse_vscode_log_from_offset` + when the file cannot be opened or read. """ - file_id = _safe_file_identity(log_path) + new_id = _safe_file_identity(log_path) cached = _VSCODE_LOG_CACHE.get(log_path) - if cached is not None and cached[0] == file_id: - _VSCODE_LOG_CACHE.move_to_end(log_path) - return cached[1] - requests = tuple(parse_vscode_log(log_path)) - if log_path in _VSCODE_LOG_CACHE: - del _VSCODE_LOG_CACHE[log_path] - elif len(_VSCODE_LOG_CACHE) >= _MAX_CACHED_VSCODE_LOGS: - _VSCODE_LOG_CACHE.popitem(last=False) # evict LRU (front) - _VSCODE_LOG_CACHE[log_path] = (file_id, requests) - return requests + + if cached is not None: + old_id, end_offset, old_requests = cached + + # Exact match: file unchanged — return cached result. + if old_id == new_id: + _VSCODE_LOG_CACHE.move_to_end(log_path) + return old_requests + + # Incremental path: file grew (append-only) beyond the cached + # resume point. Compare against ``end_offset`` because that is the + # position we will seek to when resuming parsing. + if new_id is not None and old_id is not None and new_id[1] >= end_offset: + new_reqs, new_end = _parse_vscode_log_from_offset(log_path, end_offset) + if new_end < end_offset: + # fstat inside the parser detected truncation — the + # returned results are a full reparse, not a delta. + result = tuple(new_reqs) + _update_vscode_cache(log_path, new_id, new_end, result) + return result + combined = old_requests + tuple(new_reqs) + _update_vscode_cache(log_path, new_id, new_end, combined) + return combined + + # Full parse: first call or file was truncated/replaced. + requests, end_offset = _parse_vscode_log_from_offset(log_path, 0) + result = tuple(requests) + _update_vscode_cache(log_path, new_id, end_offset, result) + return result @dataclass(slots=True) diff --git a/tests/copilot_usage/test_vscode_parser.py b/tests/copilot_usage/test_vscode_parser.py index 33354f19..c9bcd93a 100644 --- a/tests/copilot_usage/test_vscode_parser.py +++ b/tests/copilot_usage/test_vscode_parser.py @@ -1,5 +1,6 @@ """Tests for copilot_usage.vscode_parser and the vscode CLI subcommand.""" +import os import re from datetime import datetime from pathlib import Path @@ -18,6 +19,7 @@ VSCodeRequest, _default_log_candidates, # pyright: ignore[reportPrivateUsage] _get_cached_vscode_requests, # pyright: ignore[reportPrivateUsage] + _parse_vscode_log_from_offset, # pyright: ignore[reportPrivateUsage] _SummaryAccumulator, # pyright: ignore[reportPrivateUsage] _update_vscode_summary, # pyright: ignore[reportPrivateUsage] build_vscode_summary, @@ -103,7 +105,7 @@ class TestParseVscodeLog: def test_parses_real_lines(self, tmp_path: Path) -> None: log_file = tmp_path / "test.log" log_file.write_text( - "\n".join([_LOG_OPUS, _LOG_NOISE, _LOG_REDIRECT, _LOG_GPT4O]), + "\n".join([_LOG_OPUS, _LOG_NOISE, _LOG_REDIRECT, _LOG_GPT4O]) + "\n", encoding="utf-8", ) requests = parse_vscode_log(log_file) @@ -135,7 +137,7 @@ def test_invalid_timestamp_line_is_skipped(self, tmp_path: Path) -> None: assert _CCREQ_RE.match(bad_line) is not None good_line = _LOG_OPUS # a valid known-good line log_file = tmp_path / "test.log" - log_file.write_text(f"{bad_line}\n{good_line}", encoding="utf-8") + log_file.write_text(f"{bad_line}\n{good_line}\n", encoding="utf-8") result = parse_vscode_log(log_file) assert len(result) == 1 # bad line skipped assert result[0].model == "claude-opus-4.6" @@ -189,6 +191,15 @@ def test_ccreq_line_with_replacement_char_is_skipped(self, tmp_path: Path) -> No result = parse_vscode_log(log_path) assert result == [] + def test_partial_tail_included_for_one_shot_parse(self, tmp_path: Path) -> None: + """parse_vscode_log includes a final line even without a trailing newline.""" + log_file = tmp_path / "test.log" + # Write a valid ccreq line without a trailing newline. + log_file.write_text(_make_log_line(req_idx=0), encoding="utf-8") + result = parse_vscode_log(log_file) + assert len(result) == 1 + assert result[0].request_id == "req00000" + # --------------------------------------------------------------------------- # build_vscode_summary @@ -546,7 +557,7 @@ def test_end_to_end(self, tmp_path: Path) -> None: log_dir = tmp_path / "20260313" / "window1" / "exthost" / "GitHub.copilot-chat" log_dir.mkdir(parents=True) (log_dir / "GitHub Copilot Chat.log").write_text( - "\n".join([_LOG_OPUS, _LOG_REDIRECT, _LOG_NOISE, _LOG_GPT4O]), + "\n".join([_LOG_OPUS, _LOG_REDIRECT, _LOG_NOISE, _LOG_GPT4O]) + "\n", encoding="utf-8", ) summary = get_vscode_summary(tmp_path) @@ -617,10 +628,10 @@ def test_incremental_aggregation(self) -> None: ), ] - def _fake_parse(path: Path) -> list[VSCodeRequest]: + def _fake_parse(path: Path, offset: int) -> tuple[list[VSCodeRequest], int]: if path == file_a: - return list(requests_a) - return list(requests_b) + return list(requests_a), 100 + return list(requests_b), 50 with ( patch( @@ -628,7 +639,7 @@ def _fake_parse(path: Path) -> list[VSCodeRequest]: return_value=[file_a, file_b], ), patch( - "copilot_usage.vscode_parser.parse_vscode_log", + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", side_effect=_fake_parse, ) as mock_parse, patch( @@ -640,7 +651,7 @@ def _fake_parse(path: Path) -> list[VSCodeRequest]: assert summary.total_requests == 3 assert summary.log_files_parsed == 2 assert mock_parse.call_count == 2 - mock_parse.assert_has_calls([call(file_a), call(file_b)]) + mock_parse.assert_has_calls([call(file_a, 0), call(file_b, 0)]) # Verify the incremental path is used: build_vscode_summary must NOT # be called because get_vscode_summary now aggregates per-file via # _update_vscode_summary instead of collecting all requests first. @@ -665,10 +676,10 @@ def test_oserror_skips_file_and_continues(self) -> None: ), ] - def _fake_parse(path: Path) -> list[VSCodeRequest]: + def _fake_parse(path: Path, offset: int) -> tuple[list[VSCodeRequest], int]: if path == file_a: raise OSError("Permission denied") - return list(requests_b) + return list(requests_b), 50 with ( patch( @@ -676,7 +687,7 @@ def _fake_parse(path: Path) -> list[VSCodeRequest]: return_value=[file_a, file_b], ), patch( - "copilot_usage.vscode_parser.parse_vscode_log", + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", side_effect=_fake_parse, ), ): @@ -699,14 +710,17 @@ def test_log_files_found_equals_discovered(self) -> None: category="panel", ) + def _fake_parse(path: Path, offset: int) -> tuple[list[VSCodeRequest], int]: + return [req], 50 + with ( patch( "copilot_usage.vscode_parser.discover_vscode_logs", return_value=paths, ), patch( - "copilot_usage.vscode_parser.parse_vscode_log", - return_value=[req], + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + side_effect=_fake_parse, ), ): summary = get_vscode_summary() @@ -728,10 +742,10 @@ def test_log_files_found_vs_parsed_on_oserror(self) -> None: category="inline", ) - def _fake_parse(path: Path) -> list[VSCodeRequest]: + def _fake_parse(path: Path, offset: int) -> tuple[list[VSCodeRequest], int]: if path == path1: raise OSError("Permission denied") - return [req] + return [req], 50 with ( patch( @@ -739,7 +753,7 @@ def _fake_parse(path: Path) -> list[VSCodeRequest]: return_value=[path1, path2], ), patch( - "copilot_usage.vscode_parser.parse_vscode_log", + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", side_effect=_fake_parse, ), ): @@ -776,20 +790,21 @@ def test_vscode_single_file_oserror_logs_warning( _LOG_OPUS + "\n", encoding="utf-8" ) - # Make parse_vscode_log raise OSError only on the first call. + # Make _parse_vscode_log_from_offset raise OSError only on the first call. call_count = 0 - _real_parse = parse_vscode_log + _real_parse = _parse_vscode_log_from_offset - def _failing_once(path: Path) -> list[VSCodeRequest]: + def _failing_once(path: Path, offset: int) -> tuple[list[VSCodeRequest], int]: nonlocal call_count call_count += 1 if call_count == 1: msg = "Permission denied" raise OSError(msg) - return _real_parse(path) + return _real_parse(path, offset) monkeypatch.setattr( - "copilot_usage.vscode_parser.parse_vscode_log", _failing_once + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + _failing_once, ) warnings: list[str] = [] @@ -822,7 +837,8 @@ def _always_raise(*_a: object, **_kw: object) -> object: raise OSError(msg) monkeypatch.setattr( - "copilot_usage.vscode_parser.parse_vscode_log", _always_raise + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + _always_raise, ) runner = CliRunner() result = runner.invoke(main, ["vscode", "--vscode-logs", str(tmp_path)]) @@ -1212,7 +1228,7 @@ def test_all_lines_match(self, tmp_path: Path) -> None: for i in range(n) ] log_file = tmp_path / "all_match.log" - log_file.write_text("\n".join(lines), encoding="utf-8") + log_file.write_text("\n".join(lines) + "\n", encoding="utf-8") requests = parse_vscode_log(log_file) assert len(requests) == n for i, req in enumerate(requests): @@ -1235,7 +1251,7 @@ def test_1000_matching_lines_parsed(self, tmp_path: Path) -> None: for i in range(n) ] log_file = tmp_path / "fromisoformat_1000.log" - log_file.write_text("\n".join(lines), encoding="utf-8") + log_file.write_text("\n".join(lines) + "\n", encoding="utf-8") requests = parse_vscode_log(log_file) assert len(requests) == n for i, req in enumerate(requests): @@ -1261,31 +1277,31 @@ class TestVscodeLogCache: def test_first_call_populates_cache(self, tmp_path: Path) -> None: log_file = tmp_path / "chat.log" - log_file.write_text(_make_log_line(req_idx=0)) + log_file.write_text(_make_log_line(req_idx=0) + "\n") requests = _get_cached_vscode_requests(log_file) assert len(requests) == 1 assert log_file in _VSCODE_LOG_CACHE def test_second_call_returns_cached_without_reparse(self, tmp_path: Path) -> None: - """parse_vscode_log is only called once when file is unchanged.""" + """_parse_vscode_log_from_offset is only called once when file is unchanged.""" log_file = tmp_path / "chat.log" - log_file.write_text(_make_log_line(req_idx=0)) + log_file.write_text(_make_log_line(req_idx=0) + "\n") with patch( - "copilot_usage.vscode_parser.parse_vscode_log", - wraps=parse_vscode_log, + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + wraps=_parse_vscode_log_from_offset, ) as spy: first = _get_cached_vscode_requests(log_file) second = _get_cached_vscode_requests(log_file) assert spy.call_count == 1 assert first == second assert log_file in _VSCODE_LOG_CACHE - assert _VSCODE_LOG_CACHE[log_file][1] == second + assert _VSCODE_LOG_CACHE[log_file][2] == second def test_cache_invalidated_on_file_change(self, tmp_path: Path) -> None: """Changing the file causes a re-parse on the next call.""" log_file = tmp_path / "chat.log" - log_file.write_text(_make_log_line(req_idx=0)) + log_file.write_text(_make_log_line(req_idx=0) + "\n") first = _get_cached_vscode_requests(log_file) assert len(first) == 1 @@ -1294,7 +1310,7 @@ def test_cache_invalidated_on_file_change(self, tmp_path: Path) -> None: # has different contents and size, so its identity changes and the # next cache lookup should re-parse it. log_file.write_text( - _make_log_line(req_idx=0) + "\n" + _make_log_line(req_idx=1) + _make_log_line(req_idx=0) + "\n" + _make_log_line(req_idx=1) + "\n" ) second = _get_cached_vscode_requests(log_file) @@ -1305,7 +1321,7 @@ def test_lru_eviction(self, tmp_path: Path) -> None: paths: list[Path] = [] for i in range(_MAX_CACHED_VSCODE_LOGS + 1): p = tmp_path / f"log_{i}.log" - p.write_text(_make_log_line(req_idx=i)) + p.write_text(_make_log_line(req_idx=i) + "\n") paths.append(p) for p in paths: @@ -1319,8 +1335,8 @@ def test_lru_promotion_on_access(self, tmp_path: Path) -> None: """Accessing a cached entry moves it to the back (most-recently used).""" p1 = tmp_path / "a.log" p2 = tmp_path / "b.log" - p1.write_text(_make_log_line(req_idx=0)) - p2.write_text(_make_log_line(req_idx=1)) + p1.write_text(_make_log_line(req_idx=0) + "\n") + p2.write_text(_make_log_line(req_idx=1) + "\n") _get_cached_vscode_requests(p1) _get_cached_vscode_requests(p2) @@ -1337,14 +1353,205 @@ def test_get_vscode_summary_uses_cache(self, tmp_path: Path) -> None: ) log_dir.mkdir(parents=True) log_file = log_dir / "GitHub Copilot Chat.log" - log_file.write_text(_make_log_line(req_idx=0)) + log_file.write_text(_make_log_line(req_idx=0) + "\n") with patch( - "copilot_usage.vscode_parser.parse_vscode_log", - wraps=parse_vscode_log, + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + wraps=_parse_vscode_log_from_offset, ) as spy: s1 = get_vscode_summary(tmp_path) s2 = get_vscode_summary(tmp_path) assert spy.call_count == 1 assert s1.total_requests == 1 assert s2.total_requests == 1 + + +# --------------------------------------------------------------------------- +# Incremental append-only parsing tests +# --------------------------------------------------------------------------- + + +class TestIncrementalParsing: + """Tests for incremental (append-only) parsing in _get_cached_vscode_requests.""" + + def test_append_returns_combined_requests(self, tmp_path: Path) -> None: + """Appending lines to a cached log returns N + M requests.""" + log_file = tmp_path / "chat.log" + initial_lines = "\n".join(_make_log_line(req_idx=i) for i in range(3)) + log_file.write_text(initial_lines + "\n") + + first = _get_cached_vscode_requests(log_file) + assert len(first) == 3 + + # Append 2 more request lines. + with log_file.open("a") as f: + for i in range(3, 5): + f.write(_make_log_line(req_idx=i) + "\n") + + second = _get_cached_vscode_requests(log_file) + assert len(second) == 5 + # Original requests are preserved at the start. + assert second[:3] == first + + def test_append_only_reads_from_previous_offset(self, tmp_path: Path) -> None: + """Incremental path starts reading at the previous end offset.""" + log_file = tmp_path / "chat.log" + initial_lines = "\n".join(_make_log_line(req_idx=i) for i in range(4)) + log_file.write_text(initial_lines + "\n") + + _get_cached_vscode_requests(log_file) + cached_entry = _VSCODE_LOG_CACHE[log_file] + old_end_offset = cached_entry[1] + assert old_end_offset > 0 + + # Append new lines. + with log_file.open("a") as f: + for i in range(4, 7): + f.write(_make_log_line(req_idx=i) + "\n") + + with patch( + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + wraps=_parse_vscode_log_from_offset, + ) as spy: + result = _get_cached_vscode_requests(log_file) + + assert len(result) == 7 + # The incremental call should have used the old end offset. + spy.assert_called_once_with(log_file, old_end_offset) + + def test_file_replaced_triggers_full_reparse(self, tmp_path: Path) -> None: + """Replacing (truncating) a file triggers a full re-parse from offset 0.""" + log_file = tmp_path / "chat.log" + # Write a large initial file. + initial_lines = "\n".join(_make_log_line(req_idx=i) for i in range(10)) + log_file.write_text(initial_lines + "\n") + + first = _get_cached_vscode_requests(log_file) + assert len(first) == 10 + + # Replace the file with fewer lines (smaller size). + log_file.write_text(_make_log_line(req_idx=99) + "\n") + + with patch( + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + wraps=_parse_vscode_log_from_offset, + ) as spy: + second = _get_cached_vscode_requests(log_file) + + assert len(second) == 1 + assert second[0].request_id == "req00099" + # Full re-parse: offset should be 0. + spy.assert_called_once_with(log_file, 0) + + def test_append_with_noise_lines(self, tmp_path: Path) -> None: + """Appended noise (non-ccreq) lines don't produce phantom requests.""" + log_file = tmp_path / "chat.log" + log_file.write_text(_make_log_line(req_idx=0) + "\n") + + first = _get_cached_vscode_requests(log_file) + assert len(first) == 1 + + # Append noise + one real request. + with log_file.open("a") as f: + f.write("2026-03-13 22:11:00.000 [info] Some noise line\n") + f.write("2026-03-13 22:11:01.000 [info] More noise\n") + f.write(_make_log_line(req_idx=1) + "\n") + + second = _get_cached_vscode_requests(log_file) + assert len(second) == 2 + + def test_append_empty_content(self, tmp_path: Path) -> None: + """Rewriting identical content triggers the incremental path at end_offset.""" + log_file = tmp_path / "chat.log" + log_file.write_text(_make_log_line(req_idx=0) + "\n") + + first = _get_cached_vscode_requests(log_file) + assert len(first) == 1 + + # Rewrite the file with identical content and force a distinct mtime + # via os.utime so the cache reliably detects a change regardless of + # filesystem mtime resolution. + log_file.write_bytes(log_file.read_bytes()) # rewrite same content + stat = log_file.stat() + os.utime(log_file, ns=(stat.st_atime_ns, stat.st_mtime_ns + 1_000_000)) + + with patch( + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + wraps=_parse_vscode_log_from_offset, + ) as spy: + second = _get_cached_vscode_requests(log_file) + # The incremental path must be called with the cached end_offset + # (i.e. the byte length of the original content), not 0. + spy.assert_called_once() + call_offset = spy.call_args[0][1] + assert call_offset > 0, "Expected incremental call with non-zero offset" + + assert len(second) == 1 + assert second == first + + def test_partial_line_at_eof_excluded(self, tmp_path: Path) -> None: + """A partial (no-newline) line at EOF is excluded until completed.""" + log_file = tmp_path / "chat.log" + log_file.write_text(_make_log_line(req_idx=0) + "\n") + + first = _get_cached_vscode_requests(log_file) + assert len(first) == 1 + + # Append a partial line (no trailing newline). + with log_file.open("a") as f: + f.write(_make_log_line(req_idx=1)) # no \n + + second = _get_cached_vscode_requests(log_file) + # Partial line is excluded — still only the original request. + assert len(second) == 1 + + # Complete the partial line by appending a newline. + with log_file.open("a") as f: + f.write("\n") + + third = _get_cached_vscode_requests(log_file) + # Now both requests are returned. + assert len(third) == 2 + + def test_toctou_truncation_resets_offset(self, tmp_path: Path) -> None: + """_parse_vscode_log_from_offset resets to 0 when fstat shows file shrunk.""" + log_file = tmp_path / "chat.log" + log_file.write_text(_make_log_line(req_idx=0) + "\n") + + # Call with an offset beyond the file's actual size to trigger the + # TOCTOU guard (actual_size < offset → reset offset and safe_end to 0). + reqs, end_offset = _parse_vscode_log_from_offset(log_file, 999_999) + assert len(reqs) == 1 + # end_offset should equal the full file size (parsed from byte 0). + assert end_offset == log_file.stat().st_size + + def test_incremental_fstat_truncation_returns_full_reparse( + self, tmp_path: Path + ) -> None: + """Incremental path falls back when parser detects TOCTOU truncation.""" + log_file = tmp_path / "chat.log" + initial = "\n".join(_make_log_line(req_idx=i) for i in range(5)) + log_file.write_text(initial + "\n") + + # Populate cache. + first = _get_cached_vscode_requests(log_file) + assert len(first) == 5 + cached_entry = _VSCODE_LOG_CACHE[log_file] + old_end = cached_entry[1] + + # Grow the file so the incremental path is taken (new_id[1] >= end_offset). + with log_file.open("a") as f: + f.write(_make_log_line(req_idx=99) + "\n") + + # Mock _parse_vscode_log_from_offset to simulate the fstat-inside-open + # truncation: return new_end < end_offset so the cache treats it as a + # full reparse rather than a delta. + single_req = _parse_vscode_log_from_offset(log_file, 0)[0][:1] + with patch( + "copilot_usage.vscode_parser._parse_vscode_log_from_offset", + return_value=(single_req, old_end - 1), + ): + result = _get_cached_vscode_requests(log_file) + + # Result should be only the mocked full-reparse output, not combined. + assert len(result) == 1