Skip to content

Commit ebb85bf

Browse files
author
Dylan Huang
committed
Merge branch 'main' into dhuang/dxe-438-generate-cli-reference-for-evalprotocolio
2 parents ca23d4f + 6f6afa2 commit ebb85bf

File tree

3 files changed

+75
-14
lines changed

3 files changed

+75
-14
lines changed

eval_protocol/dataset_logger/sqlite_evaluation_row_store.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import os
22
from typing import List, Optional
33

4-
from peewee import CharField, DatabaseError, Model, SqliteDatabase
4+
from peewee import CharField, Model, SqliteDatabase
55
from playhouse.sqlite_ext import JSONField
66

77
from eval_protocol.event_bus.sqlite_event_bus_database import (
88
SQLITE_HARDENED_PRAGMAS,
9-
DatabaseCorruptedError,
109
check_and_repair_database,
10+
execute_with_sqlite_retry,
1111
)
1212
from eval_protocol.models import EvaluationRow
1313

@@ -55,7 +55,13 @@ def upsert_row(self, data: dict) -> None:
5555
if rollout_id is None:
5656
raise ValueError("execution_metadata.rollout_id is required to upsert a row")
5757

58-
with self._db.atomic("EXCLUSIVE"):
58+
execute_with_sqlite_retry(lambda: self._do_upsert(rollout_id, data))
59+
60+
def _do_upsert(self, rollout_id: str, data: dict) -> None:
61+
"""Internal method to perform the actual upsert within a transaction."""
62+
# Use IMMEDIATE instead of EXCLUSIVE for better concurrency
63+
# IMMEDIATE acquires a reserved lock immediately but allows concurrent reads
64+
with self._db.atomic("IMMEDIATE"):
5965
if self._EvaluationRow.select().where(self._EvaluationRow.rollout_id == rollout_id).exists():
6066
self._EvaluationRow.update(data=data).where(self._EvaluationRow.rollout_id == rollout_id).execute()
6167
else:

eval_protocol/event_bus/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from eval_protocol.event_bus.sqlite_event_bus_database import (
55
DatabaseCorruptedError,
66
check_and_repair_database,
7+
execute_with_sqlite_retry,
78
SQLITE_HARDENED_PRAGMAS,
89
)
910

eval_protocol/event_bus/sqlite_event_bus_database.py

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,60 @@
11
import os
22
import time
3-
from typing import Any, List
3+
from typing import Any, Callable, List, TypeVar
44
from uuid import uuid4
55

6-
from peewee import BooleanField, CharField, DatabaseError, DateTimeField, Model, SqliteDatabase
6+
import backoff
7+
from peewee import BooleanField, CharField, DatabaseError, DateTimeField, Model, OperationalError, SqliteDatabase
78
from playhouse.sqlite_ext import JSONField
89

910
from eval_protocol.event_bus.logger import logger
1011

1112

13+
# Retry configuration for database operations
14+
SQLITE_RETRY_MAX_TRIES = 5
15+
SQLITE_RETRY_MAX_TIME = 30 # seconds
16+
17+
18+
def _is_database_locked_error(e: Exception) -> bool:
19+
"""Check if an exception is a database locked error."""
20+
error_str = str(e).lower()
21+
return "database is locked" in error_str or "locked" in error_str
22+
23+
24+
T = TypeVar("T")
25+
26+
27+
def execute_with_sqlite_retry(operation: Callable[[], T]) -> T:
28+
"""
29+
Execute a database operation with exponential backoff retry on lock errors.
30+
31+
Uses the backoff library for consistent retry behavior across the codebase.
32+
Retries only on OperationalError with "database is locked" message.
33+
34+
Args:
35+
operation: A callable that performs the database operation
36+
37+
Returns:
38+
The result of the operation
39+
40+
Raises:
41+
OperationalError: If the operation fails after all retries
42+
"""
43+
44+
@backoff.on_exception(
45+
backoff.expo,
46+
OperationalError,
47+
max_tries=SQLITE_RETRY_MAX_TRIES,
48+
max_time=SQLITE_RETRY_MAX_TIME,
49+
giveup=lambda e: not _is_database_locked_error(e),
50+
jitter=backoff.full_jitter,
51+
)
52+
def _execute() -> T:
53+
return operation()
54+
55+
return _execute()
56+
57+
1258
# SQLite pragmas for hardened concurrency safety
1359
SQLITE_HARDENED_PRAGMAS = {
1460
"journal_mode": "wal", # Write-Ahead Logging for concurrent reads/writes
@@ -148,13 +194,15 @@ def publish_event(self, event_type: str, data: Any, process_id: str) -> None:
148194
else:
149195
serialized_data = data
150196

151-
self._Event.create(
152-
event_id=str(uuid4()),
153-
event_type=event_type,
154-
data=serialized_data,
155-
timestamp=time.time(),
156-
process_id=process_id,
157-
processed=False,
197+
execute_with_sqlite_retry(
198+
lambda: self._Event.create(
199+
event_id=str(uuid4()),
200+
event_type=event_type,
201+
data=serialized_data,
202+
timestamp=time.time(),
203+
process_id=process_id,
204+
processed=False,
205+
)
158206
)
159207
except Exception as e:
160208
logger.warning(f"Failed to publish event to database: {e}")
@@ -188,14 +236,20 @@ def get_unprocessed_events(self, process_id: str) -> List[dict]:
188236
def mark_event_processed(self, event_id: str) -> None:
189237
"""Mark an event as processed."""
190238
try:
191-
self._Event.update(processed=True).where(self._Event.event_id == event_id).execute()
239+
execute_with_sqlite_retry(
240+
lambda: self._Event.update(processed=True).where(self._Event.event_id == event_id).execute()
241+
)
192242
except Exception as e:
193243
logger.debug(f"Failed to mark event as processed: {e}")
194244

195245
def cleanup_old_events(self, max_age_hours: int = 24) -> None:
196246
"""Clean up old processed events."""
197247
try:
198248
cutoff_time = time.time() - (max_age_hours * 3600)
199-
self._Event.delete().where((self._Event.processed) & (self._Event.timestamp < cutoff_time)).execute()
249+
execute_with_sqlite_retry(
250+
lambda: self._Event.delete()
251+
.where((self._Event.processed) & (self._Event.timestamp < cutoff_time))
252+
.execute()
253+
)
200254
except Exception as e:
201255
logger.debug(f"Failed to cleanup old events: {e}")

0 commit comments

Comments
 (0)