Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions apps/feder-ingest/src/feder_ingest/db_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


class DBCache:
""""LRU cache of writable database connections."""
"""LRU cache of writable database connections."""

def __init__(
self,
Expand Down Expand Up @@ -65,7 +65,7 @@ def __init__(
def _startup_cleanup(self) -> None:
for path in glob.glob(os.path.join(self.staging_dir, '????', '.*.sqlite.importing.*')):
self._remove_temp_file(path)
for path in glob.glob(os.path.join(self.export_scratch_dir, '*')):
for path in glob.glob(os.path.join(self.export_scratch_dir, '*.export.*.sqlite')):
self._remove_temp_file(path)
for path in glob.glob(os.path.join(self.data_dir, '????', '.*.sqlite.exporting.*')):
self._remove_temp_file(path)
Expand Down Expand Up @@ -140,7 +140,9 @@ def connect(self, ref_date: datetime | date | int) -> WritableDB:
if len(self._nursery) > self.nursery_size:
self.commit(force=True)
evict_date = next(iter(self._nursery))
self._promote((evict_date, self._nursery[evict_date]))
nursery = self._nursery[evict_date]
self._promote((evict_date, nursery))
nursery.close()
del self._nursery[evict_date]
return conn

Expand Down Expand Up @@ -210,8 +212,18 @@ def _delete_staging_files(self, ref_date: datetime | date | int) -> None:
pass

def _evict_staged_connections(self) -> None:
if len(self._connections) > self.connection_cache_size:
_, conn = self._connections.popitem(last=False)
while len(self._connections) > self.connection_cache_size:
oldest_day, conn = self._connections.popitem(last=False)
if oldest_day in self._touched:
self._connections[oldest_day] = conn
self.commit(force=True)
if oldest_day in self._touched:
raise RuntimeError(
f'failed to commit staged database {oldest_day.strftime("%Y-%j")} '
'before cache eviction'
)
continue

conn.close()

def _promote(self, date_db: tuple[date, WritableDB]) -> None:
Expand Down Expand Up @@ -292,9 +304,10 @@ def _open_raw_staging_connection(self, staging_path: str) -> sqlite3.Connection:
uri = f'file:{staging_path}?mode=ro'
try:
return sqlite3.connect(uri, uri=True)
except sqlite3.Error:
logger.debug('failed to open staged database read-only; falling back to read-write', exc_info=True)
return sqlite3.connect(staging_path)
except sqlite3.Error as exc:
raise RuntimeError(
f'failed to open staged database in read-only mode for export: {staging_path}'
) from exc

def _finalize_idle(self, now: datetime) -> None:
for day, last_update in list(self._last_update.items()):
Expand Down
44 changes: 44 additions & 0 deletions tests/feder_ingest/test_db_cache_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,57 @@ def test_startup_temp_cleanup_removes_import_export_temps(tmp_path):
importing = staging_year / '.2025-142.sqlite.importing.123'
exporting = data_year / '.2025-142.sqlite.exporting.123'
scratch_tmp = export_scratch / '2025-142.export.123.sqlite'
non_temp_file = export_scratch / 'keep-me.txt'
for path in (importing, exporting, scratch_tmp):
path.write_text('temp')
non_temp_file.write_text('keep')

cache = DBCache(str(data), str(staging), str(scratch))
try:
assert not importing.exists()
assert not exporting.exists()
assert not scratch_tmp.exists()
assert non_temp_file.exists()
finally:
cache.close()


def test_evicted_touched_staged_connection_commits_pending_writes(tmp_path):
day1 = datetime(2025, 5, 22)
day2 = datetime(2025, 5, 23)
data = tmp_path / 'data'
staging = tmp_path / 'staging'
scratch = tmp_path / 'scratch'
data.mkdir()
_make_db(staging, day1, 'staging')
_make_db(staging, day2, 'staging')

cache = DBCache(str(data), str(staging), str(scratch), connection_cache_size=1)
try:
db1 = cache.connect(day1)
db1.add_trajectory(_trajectory_for(day1, 'pending'), commit=False)
cache._mark_touched(db1)

cache.connect(day2)

assert _count_rows(staging / '2025' / '2025-142.sqlite') == 2
finally:
cache.close()


def test_open_raw_staging_connection_does_not_fallback_when_staging_missing(tmp_path):
data = tmp_path / 'data'
staging = tmp_path / 'staging'
scratch = tmp_path / 'scratch'
data.mkdir()
staging_2025 = staging / '2025'
staging_2025.mkdir(parents=True)

cache = DBCache(str(data), str(staging), str(scratch))
missing = staging_2025 / '2025-142.sqlite'

with pytest.raises(RuntimeError, match='read-only mode'):
cache._open_raw_staging_connection(str(missing))

assert not missing.exists()
cache.close()
Loading