From 93cc2308bdc28b3d7c32cba29f67165adde840bb Mon Sep 17 00:00:00 2001 From: Ian Ross Date: Sat, 9 May 2026 16:13:06 +0200 Subject: [PATCH] Fix DB cache staged-connection eviction and strict staging exports Co-authored-by: GPT-5.3 Codex Spark Pi-Model: openai-codex/gpt-5.3-codex-spark --- .../feder-ingest/src/feder_ingest/db_cache.py | 29 ++++++++---- tests/feder_ingest/test_db_cache_paths.py | 44 +++++++++++++++++++ 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/apps/feder-ingest/src/feder_ingest/db_cache.py b/apps/feder-ingest/src/feder_ingest/db_cache.py index a6d769a..7200158 100644 --- a/apps/feder-ingest/src/feder_ingest/db_cache.py +++ b/apps/feder-ingest/src/feder_ingest/db_cache.py @@ -21,7 +21,7 @@ class DBCache: - """"LRU cache of writable database connections.""" + """LRU cache of writable database connections.""" def __init__( self, @@ -65,7 +65,7 @@ def __init__( def _startup_cleanup(self) -> None: for path in glob.glob(os.path.join(self.staging_dir, '????', '.*.sqlite.importing.*')): self._remove_temp_file(path) - for path in glob.glob(os.path.join(self.export_scratch_dir, '*')): + for path in glob.glob(os.path.join(self.export_scratch_dir, '*.export.*.sqlite')): self._remove_temp_file(path) for path in glob.glob(os.path.join(self.data_dir, '????', '.*.sqlite.exporting.*')): self._remove_temp_file(path) @@ -140,7 +140,9 @@ def connect(self, ref_date: datetime | date | int) -> WritableDB: if len(self._nursery) > self.nursery_size: self.commit(force=True) evict_date = next(iter(self._nursery)) - self._promote((evict_date, self._nursery[evict_date])) + nursery = self._nursery[evict_date] + self._promote((evict_date, nursery)) + nursery.close() del self._nursery[evict_date] return conn @@ -210,8 +212,18 @@ def _delete_staging_files(self, ref_date: datetime | date | int) -> None: pass def _evict_staged_connections(self) -> None: - if len(self._connections) > self.connection_cache_size: - _, conn = self._connections.popitem(last=False) + while len(self._connections) > self.connection_cache_size: + oldest_day, conn = self._connections.popitem(last=False) + if oldest_day in self._touched: + self._connections[oldest_day] = conn + self.commit(force=True) + if oldest_day in self._touched: + raise RuntimeError( + f'failed to commit staged database {oldest_day.strftime("%Y-%j")} ' + 'before cache eviction' + ) + continue + conn.close() def _promote(self, date_db: tuple[date, WritableDB]) -> None: @@ -292,9 +304,10 @@ def _open_raw_staging_connection(self, staging_path: str) -> sqlite3.Connection: uri = f'file:{staging_path}?mode=ro' try: return sqlite3.connect(uri, uri=True) - except sqlite3.Error: - logger.debug('failed to open staged database read-only; falling back to read-write', exc_info=True) - return sqlite3.connect(staging_path) + except sqlite3.Error as exc: + raise RuntimeError( + f'failed to open staged database in read-only mode for export: {staging_path}' + ) from exc def _finalize_idle(self, now: datetime) -> None: for day, last_update in list(self._last_update.items()): diff --git a/tests/feder_ingest/test_db_cache_paths.py b/tests/feder_ingest/test_db_cache_paths.py index 573f441..f94a4e1 100644 --- a/tests/feder_ingest/test_db_cache_paths.py +++ b/tests/feder_ingest/test_db_cache_paths.py @@ -413,13 +413,57 @@ def test_startup_temp_cleanup_removes_import_export_temps(tmp_path): importing = staging_year / '.2025-142.sqlite.importing.123' exporting = data_year / '.2025-142.sqlite.exporting.123' scratch_tmp = export_scratch / '2025-142.export.123.sqlite' + non_temp_file = export_scratch / 'keep-me.txt' for path in (importing, exporting, scratch_tmp): path.write_text('temp') + non_temp_file.write_text('keep') cache = DBCache(str(data), str(staging), str(scratch)) try: assert not importing.exists() assert not exporting.exists() assert not scratch_tmp.exists() + assert non_temp_file.exists() finally: cache.close() + + +def test_evicted_touched_staged_connection_commits_pending_writes(tmp_path): + day1 = datetime(2025, 5, 22) + day2 = datetime(2025, 5, 23) + data = tmp_path / 'data' + staging = tmp_path / 'staging' + scratch = tmp_path / 'scratch' + data.mkdir() + _make_db(staging, day1, 'staging') + _make_db(staging, day2, 'staging') + + cache = DBCache(str(data), str(staging), str(scratch), connection_cache_size=1) + try: + db1 = cache.connect(day1) + db1.add_trajectory(_trajectory_for(day1, 'pending'), commit=False) + cache._mark_touched(db1) + + cache.connect(day2) + + assert _count_rows(staging / '2025' / '2025-142.sqlite') == 2 + finally: + cache.close() + + +def test_open_raw_staging_connection_does_not_fallback_when_staging_missing(tmp_path): + data = tmp_path / 'data' + staging = tmp_path / 'staging' + scratch = tmp_path / 'scratch' + data.mkdir() + staging_2025 = staging / '2025' + staging_2025.mkdir(parents=True) + + cache = DBCache(str(data), str(staging), str(scratch)) + missing = staging_2025 / '2025-142.sqlite' + + with pytest.raises(RuntimeError, match='read-only mode'): + cache._open_raw_staging_connection(str(missing)) + + assert not missing.exists() + cache.close()