-
Notifications
You must be signed in to change notification settings - Fork 1
fix: incremental append-only parse for events.jsonl cache (#732) #736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b53642c
b1bf7d9
0620ba4
a1c9429
0657b94
9adad16
9776a94
f1f62f0
a224e1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,9 +95,17 @@ def _insert_session_entry( | |
|
|
||
| @dataclasses.dataclass(frozen=True, slots=True) | ||
| class _CachedEvents: | ||
| """Cache entry pairing a file identity with parsed events.""" | ||
| """Cache entry pairing a file identity with parsed events. | ||
|
|
||
| ``end_offset`` is the byte position after the last fully consumed | ||
| line. When the file grows (append-only), only bytes after | ||
| ``end_offset`` need to be parsed — avoiding a full re-read, even if | ||
| some fully consumed lines were skipped due to parse or validation | ||
| errors. | ||
| """ | ||
|
|
||
| file_id: tuple[int, int] | None | ||
| end_offset: int | ||
| events: tuple[SessionEvent, ...] | ||
|
|
||
|
|
||
|
|
@@ -112,15 +120,16 @@ class _CachedEvents: | |
| def _insert_events_entry( | ||
| events_path: Path, | ||
| file_id: tuple[int, int] | None, | ||
| events: list[SessionEvent], | ||
| events: list[SessionEvent] | tuple[SessionEvent, ...], | ||
| end_offset: int = 0, | ||
| ) -> None: | ||
| """Insert parsed events into ``_EVENTS_CACHE`` with LRU eviction. | ||
|
|
||
| If *events_path* already exists in the cache (stale file-id), the | ||
| old entry is removed first. Otherwise, when the cache is full the | ||
| least-recently-used entry (front of the ``OrderedDict``) is evicted. | ||
|
|
||
| The *events* list is converted to a ``tuple`` before storage so | ||
| The *events* sequence is converted to a ``tuple`` before storage so | ||
| that callers cannot accidentally add, remove, or reorder entries | ||
| in the cache. This is **container-level** immutability only — | ||
| individual ``SessionEvent`` objects remain mutable and must not | ||
|
|
@@ -130,17 +139,97 @@ def _insert_events_entry( | |
| del _EVENTS_CACHE[events_path] | ||
| elif len(_EVENTS_CACHE) >= _MAX_CACHED_EVENTS: | ||
| _EVENTS_CACHE.popitem(last=False) # evict LRU (front) | ||
| _EVENTS_CACHE[events_path] = _CachedEvents(file_id=file_id, events=tuple(events)) | ||
| stored = events if isinstance(events, tuple) else tuple(events) | ||
| _EVENTS_CACHE[events_path] = _CachedEvents( | ||
| file_id=file_id, end_offset=end_offset, events=stored | ||
| ) | ||
|
|
||
|
|
||
| def _parse_events_from_offset( | ||
| events_path: Path, offset: int | ||
| ) -> tuple[list[SessionEvent], int]: | ||
| """Parse events from *events_path* starting at byte *offset*. | ||
|
|
||
| Only lines beginning at or after *offset* are JSON-decoded and | ||
| Pydantic-validated. Complete malformed or invalid lines are skipped | ||
| with a warning, matching the behaviour of :func:`parse_events`. | ||
|
|
||
| Lines without a trailing newline (possible incomplete write) that | ||
| fail JSON decoding are treated as still-in-progress and stop | ||
| parsing — the returned *safe_end* does not advance past them so | ||
| the caller can retry on the next refresh. | ||
|
|
||
| Returns: | ||
| ``(new_events, safe_end)`` where *safe_end* is the byte | ||
| position after the last fully consumed line. Callers should | ||
| store this as ``end_offset`` so incomplete trailing lines are | ||
| retried on the next refresh. | ||
|
|
||
| Raises: | ||
| OSError: If the file cannot be opened or read. | ||
| """ | ||
| new_events: list[SessionEvent] = [] | ||
| safe_offset = offset | ||
| try: | ||
| with events_path.open("rb") as fh: | ||
| fh.seek(offset) | ||
| current_offset = offset | ||
| for raw_line in fh: | ||
| line_start = current_offset | ||
| current_offset += len(raw_line) | ||
| stripped = raw_line.strip() | ||
| if not stripped: | ||
| safe_offset = current_offset | ||
| continue | ||
| try: | ||
| raw = json.loads(stripped) | ||
| except json.JSONDecodeError: | ||
| if not raw_line.endswith(b"\n"): | ||
| # Possibly incomplete write — stop and retry later | ||
| break | ||
| logger.warning( | ||
| "{}:offset {} — malformed JSON, skipping", | ||
| events_path, | ||
| line_start, | ||
| ) | ||
| safe_offset = current_offset | ||
| continue | ||
| try: | ||
| new_events.append(SessionEvent.model_validate(raw)) | ||
| except ValidationError as exc: | ||
| logger.warning( | ||
| "{}:offset {} — validation error ({}), skipping", | ||
| events_path, | ||
| line_start, | ||
| exc.error_count(), | ||
| ) | ||
| safe_offset = current_offset | ||
| except UnicodeDecodeError as exc: | ||
| logger.warning( | ||
| "{} — UTF-8 decode error at offset {}; returning {} new events (partial): {}", | ||
| events_path, | ||
| safe_offset, | ||
| len(new_events), | ||
| exc, | ||
| ) | ||
|
Comment on lines
+207
to
+214
|
||
| return new_events, safe_offset | ||
|
|
||
|
|
||
| def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]: | ||
| """Return parsed events, using cache when file identity is unchanged. | ||
|
|
||
| Delegates to :func:`parse_events` on a cache miss and stores the | ||
| result keyed by *events_path* with file-identity validation on | ||
| lookup. The cache is bounded to :data:`_MAX_CACHED_EVENTS` | ||
| entries; the **least-recently used** entry is evicted when the | ||
| limit is reached. | ||
| On a cache miss (cold start, truncation, or unknown file) the entire | ||
| file is re-read via :func:`_parse_events_from_offset` at offset 0 | ||
| and the result is stored keyed by *events_path* with file-identity | ||
| validation on lookup. The cache is bounded to | ||
| :data:`_MAX_CACHED_EVENTS` entries; the **least-recently used** entry | ||
| is evicted when the limit is reached. | ||
|
|
||
| When the file has grown since the last read (append-only pattern), | ||
| only the newly appended bytes are parsed via | ||
| :func:`_parse_events_from_offset` and merged with the cached tuple. | ||
| A full reparse is performed when the file has shrunk (truncation or | ||
| replacement) or on cold start. | ||
|
|
||
| The returned ``tuple`` prevents callers from adding, removing, or | ||
| reordering cached entries (container-level immutability). Individual | ||
|
|
@@ -149,16 +238,51 @@ def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]: | |
| ``list(get_cached_events(...))``. | ||
|
|
||
| Raises: | ||
| OSError: Propagated from :func:`parse_events` when the file | ||
| cannot be opened or read. | ||
| OSError: Propagated from :func:`_parse_events_from_offset` | ||
| when the file cannot be opened or read. | ||
| """ | ||
| file_id = _safe_file_identity(events_path) | ||
| cached = _EVENTS_CACHE.get(events_path) | ||
| if cached is not None and cached.file_id == file_id: | ||
| _EVENTS_CACHE.move_to_end(events_path) | ||
| return cached.events | ||
| events = parse_events(events_path) | ||
| _insert_events_entry(events_path, file_id, events) | ||
|
|
||
| if cached is not None and file_id is not None: | ||
| new_size = file_id[1] | ||
| if cached.file_id == file_id: | ||
| _EVENTS_CACHE.move_to_end(events_path) | ||
| return cached.events | ||
| # Append-only growth: new size ≥ cached end_offset → incremental | ||
| if new_size >= cached.end_offset: | ||
| new_events, safe_end = _parse_events_from_offset( | ||
| events_path, cached.end_offset | ||
| ) | ||
| merged = cached.events + tuple(new_events) | ||
|
Comment on lines
+247
to
+257
|
||
| # Never claim the cache covers more bytes than actually | ||
| # consumed — clamp size to safe_end when they diverge so | ||
| # the next call takes the incremental path for any | ||
| # trailing unconsumed bytes. | ||
| inc_id: tuple[int, int] | None = file_id | ||
| if file_id[1] != safe_end: | ||
| inc_id = (file_id[0], safe_end) | ||
| _insert_events_entry(events_path, inc_id, merged, safe_end) | ||
| return _EVENTS_CACHE[events_path].events | ||
|
|
||
| # Full reparse: cold start, truncation, or unknown file. | ||
| # Use _parse_events_from_offset(offset=0) instead of parse_events() so | ||
| # we get a safe_end byte boundary that reflects only bytes actually | ||
| # consumed. A post-parse stat() could observe bytes appended after | ||
| # parsing completed, overstating the consumed boundary and causing | ||
| # later incremental refreshes to skip unparsed data. | ||
| events, safe_end = _parse_events_from_offset(events_path, 0) | ||
| # Re-stat so the cached file_id reflects the current mtime for | ||
| # exact-match lookups, but never claim the cache covers more bytes | ||
| # than were actually consumed up to safe_end. | ||
| post_id = _safe_file_identity(events_path) | ||
| if post_id is None: | ||
| stored_id = file_id | ||
| elif post_id[1] == safe_end: | ||
| stored_id = post_id | ||
| else: | ||
| stored_id = (post_id[0], safe_end) | ||
| _insert_events_entry(events_path, stored_id, events, safe_end) | ||
| return _EVENTS_CACHE[events_path].events | ||
|
|
||
|
|
||
|
|
@@ -783,7 +907,9 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]: | |
| # Only the newest _MAX_CACHED_EVENTS entries are retained for | ||
| # _EVENTS_CACHE to avoid a temporary memory spike when many sessions | ||
| # are cache-misses. | ||
| deferred_events: list[tuple[Path, tuple[int, int] | None, list[SessionEvent]]] = [] | ||
| deferred_events: list[ | ||
| tuple[Path, tuple[int, int] | None, list[SessionEvent], int] | ||
| ] = [] | ||
| deferred_sessions: list[tuple[Path, _CachedSession]] = [] | ||
| cache_hit_paths: list[Path] = [] | ||
| for events_path, file_id, plan_id in discovered: | ||
|
|
@@ -818,15 +944,32 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]: | |
| cache_hit_paths.append(events_path) | ||
| summaries.append(summary) | ||
| continue | ||
| # Reuse _EVENTS_CACHE for incremental append-only parsing so | ||
| # that the session-summary rebuild only validates newly appended | ||
| # events instead of re-parsing the entire file from offset 0. | ||
| events_cached = _EVENTS_CACHE.get(events_path) | ||
| start_offset = 0 | ||
| prior_events: tuple[SessionEvent, ...] = () | ||
| if ( | ||
| events_cached is not None | ||
| and file_id is not None | ||
| and file_id[1] >= events_cached.end_offset | ||
| and events_cached.end_offset > 0 | ||
| ): | ||
| start_offset = events_cached.end_offset | ||
| prior_events = events_cached.events | ||
| try: | ||
| events = parse_events(events_path) | ||
| new_events, safe_end = _parse_events_from_offset(events_path, start_offset) | ||
| except OSError as exc: | ||
microsasa marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+947
to
963
|
||
| logger.warning("Skipping unreadable session {}: {}", events_path, exc) | ||
| continue | ||
| events: list[SessionEvent] = ( | ||
| list(prior_events) + new_events if prior_events else new_events | ||
| ) | ||
| if not events: | ||
| continue | ||
| if len(deferred_events) < _MAX_CACHED_EVENTS: | ||
| deferred_events.append((events_path, file_id, events)) | ||
| deferred_events.append((events_path, file_id, events, safe_end)) | ||
| meta = _build_session_summary_with_meta( | ||
| events, | ||
| session_dir=events_path.parent, | ||
|
|
@@ -861,8 +1004,11 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]: | |
|
|
||
| # Populate _EVENTS_CACHE in oldest→newest order so that the newest | ||
| # sessions sit at the back (MRU) and eviction drops the oldest. | ||
| for ep, fid, evts in reversed(deferred_events): | ||
| _insert_events_entry(ep, fid, evts) | ||
| for ep, fid, evts, safe_end in reversed(deferred_events): | ||
| # Use the safe_end byte boundary returned by the parser rather | ||
| # than a post-parse stat() size, which can overstate the bytes | ||
| # actually consumed if the file grew after parsing completed. | ||
| _insert_events_entry(ep, fid, evts, safe_end) | ||
|
|
||
| # Prune stale cache entries for sessions no longer on disk. | ||
| discovered_paths = {p for p, _, _ in discovered} | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.