Skip to content

Commit 0843129

Browse files
author
Dylan Huang
committed
Implement robust database connection with retry logic to handle pragma execution failures
1 parent d4a445b commit 0843129

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

eval_protocol/dataset_logger/sqlite_evaluation_row_store.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from eval_protocol.event_bus.sqlite_event_bus_database import (
88
SQLITE_HARDENED_PRAGMAS,
99
check_and_repair_database,
10+
connect_with_retry,
1011
execute_with_sqlite_retry,
1112
)
1213
from eval_protocol.models import EvaluationRow
@@ -42,8 +43,8 @@ class EvaluationRow(BaseModel): # type: ignore
4243

4344
self._EvaluationRow = EvaluationRow
4445

45-
# Wrap connect() in retry logic since setting pragmas can fail with "database is locked"
46-
execute_with_sqlite_retry(lambda: self._db.connect(reuse_if_open=True))
46+
# Connect with retry logic that properly handles pragma execution failures
47+
connect_with_retry(self._db)
4748
# Use safe=True to avoid errors when tables/indexes already exist
4849
execute_with_sqlite_retry(lambda: self._db.create_tables([EvaluationRow], safe=True))
4950

eval_protocol/event_bus/sqlite_event_bus_database.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,49 @@ def _execute() -> T:
5555
return _execute()
5656

5757

58+
def connect_with_retry(db: SqliteDatabase) -> None:
59+
"""
60+
Connect to the database with retry logic, ensuring pragmas are always applied.
61+
62+
Peewee's connect() method sets the connection state *before* executing pragmas
63+
(in _initialize_connection). If pragma execution fails with "database is locked",
64+
the connection is marked as open but pragmas are not applied. Subsequent calls
65+
to connect(reuse_if_open=True) would see the connection as already open and
66+
skip pragma execution entirely.
67+
68+
This function handles this edge case by:
69+
1. Closing the connection if a lock error occurs during connect
70+
2. Retrying with exponential backoff until pragmas are successfully applied
71+
72+
Args:
73+
db: The SqliteDatabase instance to connect
74+
"""
75+
76+
@backoff.on_exception(
77+
backoff.expo,
78+
OperationalError,
79+
max_tries=SQLITE_RETRY_MAX_TRIES,
80+
max_time=SQLITE_RETRY_MAX_TIME,
81+
giveup=lambda e: not _is_database_locked_error(e),
82+
jitter=backoff.full_jitter,
83+
)
84+
def _connect() -> None:
85+
try:
86+
# Close any partially-open connection before retrying to ensure
87+
# a fresh connection is opened and pragmas are executed
88+
if not db.is_closed():
89+
db.close()
90+
db.connect()
91+
except OperationalError:
92+
# If connect fails (e.g., during pragma execution), ensure the
93+
# connection is closed so the next retry starts fresh
94+
if not db.is_closed():
95+
db.close()
96+
raise
97+
98+
_connect()
99+
100+
58101
# SQLite pragmas for hardened concurrency safety
59102
SQLITE_HARDENED_PRAGMAS = {
60103
"journal_mode": "wal", # Write-Ahead Logging for concurrent reads/writes
@@ -181,8 +224,8 @@ class Event(BaseModel): # type: ignore
181224
processed = BooleanField(default=False) # Track if event has been processed
182225

183226
self._Event = Event
184-
# Wrap connect() in retry logic since setting pragmas can fail with "database is locked"
185-
execute_with_sqlite_retry(lambda: self._db.connect(reuse_if_open=True))
227+
# Connect with retry logic that properly handles pragma execution failures
228+
connect_with_retry(self._db)
186229
# Use safe=True to avoid errors when tables already exist
187230
execute_with_sqlite_retry(lambda: self._db.create_tables([Event], safe=True))
188231

0 commit comments

Comments
 (0)