Skip to content

Commit 9b5e353

Browse files
author
Dylan Huang
committed
Update dependencies and refactor storage backend handling
- Replaced `peewee` with `tinydb` as the default storage backend in `pyproject.toml`. - Added support for both `tinydb` and `sqlite` in the evaluation row store and event bus database. - Refactored logger and database initialization to accommodate the new storage options. - Updated the SQLite-related classes to raise informative errors if `peewee` is not installed. - Ensured backward compatibility for existing SQLite logger and event bus implementations.
1 parent bf8f228 commit 9b5e353

28 files changed

+1108
-391
lines changed

eval_protocol/dataset_logger/__init__.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,45 @@
11
import os
22

33
from eval_protocol.dataset_logger.dataset_logger import DatasetLogger
4-
from eval_protocol.dataset_logger.sqlite_dataset_logger_adapter import SqliteDatasetLoggerAdapter
4+
from eval_protocol.dataset_logger.evaluation_row_store import EvaluationRowStore
55

66

7-
# Allow disabling sqlite logger to avoid environment-specific constraints in simple CLI runs.
8-
def _get_default_logger():
9-
if os.getenv("DISABLE_EP_SQLITE_LOG", "0").strip() != "1":
10-
return SqliteDatasetLoggerAdapter()
7+
def get_evaluation_row_store(db_path: str) -> EvaluationRowStore:
8+
"""
9+
Factory to get the configured storage backend.
10+
11+
Uses EP_STORAGE environment variable to select backend:
12+
- "tinydb" (default): Uses TinyDB with JSON file storage
13+
- "sqlite": Uses SQLite with peewee ORM
14+
15+
Args:
16+
db_path: Path to the database file
17+
18+
Returns:
19+
EvaluationRowStore implementation
20+
"""
21+
storage_type = os.getenv("EP_STORAGE", "tinydb").lower()
22+
23+
if storage_type == "sqlite":
24+
from eval_protocol.dataset_logger.sqlite_evaluation_row_store import SqliteEvaluationRowStore
25+
26+
return SqliteEvaluationRowStore(db_path)
1127
else:
28+
from eval_protocol.dataset_logger.tinydb_evaluation_row_store import TinyDBEvaluationRowStore
29+
30+
return TinyDBEvaluationRowStore(db_path)
31+
32+
33+
def _get_default_db_filename() -> str:
34+
"""Get the default database filename based on storage backend."""
35+
storage_type = os.getenv("EP_STORAGE", "tinydb").lower()
36+
return "logs.db" if storage_type == "sqlite" else "logs.json"
37+
38+
39+
def _get_default_logger():
40+
"""Get the default logger based on configuration."""
41+
# Allow disabling logger to avoid environment-specific constraints in simple CLI runs.
42+
if os.getenv("DISABLE_EP_SQLITE_LOG", "0").strip() == "1":
1243

1344
class _NoOpLogger(DatasetLogger):
1445
def log(self, row):
@@ -19,6 +50,11 @@ def read(self, rollout_id=None):
1950

2051
return _NoOpLogger()
2152

53+
# Import here to avoid circular imports
54+
from eval_protocol.dataset_logger.dataset_logger_adapter import DatasetLoggerAdapter
55+
56+
return DatasetLoggerAdapter()
57+
2258

2359
# Lazy property that creates the logger only when accessed
2460
class _LazyLogger(DatasetLogger):
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import os
2+
from typing import TYPE_CHECKING, List, Optional
3+
4+
from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE, DatasetLogger
5+
from eval_protocol.dataset_logger.evaluation_row_store import EvaluationRowStore
6+
from eval_protocol.directory_utils import find_eval_protocol_dir
7+
from eval_protocol.event_bus import event_bus
8+
from eval_protocol.event_bus.logger import logger
9+
10+
if TYPE_CHECKING:
11+
from eval_protocol.models import EvaluationRow
12+
13+
14+
class DatasetLoggerAdapter(DatasetLogger):
15+
"""
16+
Dataset logger that uses the configured storage backend.
17+
18+
The storage backend is selected based on the EP_STORAGE environment variable:
19+
- "tinydb" (default): Uses TinyDB with JSON file storage
20+
- "sqlite": Uses SQLite with peewee ORM
21+
"""
22+
23+
def __init__(self, db_path: Optional[str] = None, store: Optional[EvaluationRowStore] = None):
24+
eval_protocol_dir = find_eval_protocol_dir()
25+
if db_path is not None and store is not None:
26+
raise ValueError("Provide only one of db_path or store, not both.")
27+
if store is not None:
28+
self.db_path = store.db_path
29+
self._store = store
30+
else:
31+
# Import here to avoid circular imports
32+
from eval_protocol.dataset_logger import _get_default_db_filename, get_evaluation_row_store
33+
34+
default_db = _get_default_db_filename()
35+
self.db_path = db_path if db_path is not None else os.path.join(eval_protocol_dir, default_db)
36+
self._store = get_evaluation_row_store(self.db_path)
37+
38+
def log(self, row: "EvaluationRow") -> None:
39+
data = row.model_dump(exclude_none=True, mode="json")
40+
rollout_id = data.get("execution_metadata", {}).get("rollout_id", "unknown")
41+
logger.debug(f"[EVENT_BUS_EMIT] Starting to log row with rollout_id: {rollout_id}")
42+
43+
self._store.upsert_row(data=data)
44+
logger.debug(f"[EVENT_BUS_EMIT] Successfully stored row in database for rollout_id: {rollout_id}")
45+
46+
try:
47+
from eval_protocol.models import EvaluationRow as EvalRow
48+
49+
logger.debug(f"[EVENT_BUS_EMIT] Emitting event '{LOG_EVENT_TYPE}' for rollout_id: {rollout_id}")
50+
event_bus.emit(LOG_EVENT_TYPE, EvalRow(**data))
51+
logger.debug(f"[EVENT_BUS_EMIT] Successfully emitted event for rollout_id: {rollout_id}")
52+
except Exception as e:
53+
# Avoid breaking storage due to event emission issues
54+
logger.error(f"[EVENT_BUS_EMIT] Failed to emit row_upserted event for rollout_id {rollout_id}: {e}")
55+
pass
56+
57+
def read(self, rollout_id: Optional[str] = None) -> List["EvaluationRow"]:
58+
from eval_protocol.models import EvaluationRow
59+
60+
results = self._store.read_rows(rollout_id=rollout_id)
61+
return [EvaluationRow(**data) for data in results]
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from abc import ABC, abstractmethod
2+
from typing import List, Optional
3+
4+
5+
class EvaluationRowStore(ABC):
6+
"""
7+
Abstract base class for evaluation row storage.
8+
9+
Stores arbitrary row data as JSON keyed by a unique string `rollout_id`.
10+
Implementations can use different storage backends (SQLite, TinyDB, etc.)
11+
"""
12+
13+
@property
14+
@abstractmethod
15+
def db_path(self) -> str:
16+
"""Return the path to the database file."""
17+
pass
18+
19+
@abstractmethod
20+
def upsert_row(self, data: dict) -> None:
21+
"""
22+
Insert or update a row by rollout_id.
23+
24+
Args:
25+
data: Row data containing execution_metadata.rollout_id
26+
"""
27+
pass
28+
29+
@abstractmethod
30+
def read_rows(self, rollout_id: Optional[str] = None) -> List[dict]:
31+
"""
32+
Read rows, optionally filtered by rollout_id.
33+
34+
Args:
35+
rollout_id: If provided, filter to this specific rollout
36+
37+
Returns:
38+
List of row data dictionaries
39+
"""
40+
pass
41+
42+
@abstractmethod
43+
def delete_row(self, rollout_id: str) -> int:
44+
"""
45+
Delete a row by rollout_id.
46+
47+
Args:
48+
rollout_id: The rollout_id to delete
49+
50+
Returns:
51+
Number of rows deleted
52+
"""
53+
pass
54+
55+
@abstractmethod
56+
def delete_all_rows(self) -> int:
57+
"""
58+
Delete all rows.
59+
60+
Returns:
61+
Number of rows deleted
62+
"""
63+
pass
Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,13 @@
1-
import os
2-
from typing import List, Optional
1+
"""
2+
Backwards-compatible alias for DatasetLoggerAdapter.
33
4-
from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE, DatasetLogger
5-
from eval_protocol.dataset_logger.sqlite_evaluation_row_store import SqliteEvaluationRowStore
6-
from eval_protocol.directory_utils import find_eval_protocol_dir
7-
from eval_protocol.event_bus import event_bus
8-
from eval_protocol.event_bus.logger import logger
9-
from eval_protocol.models import EvaluationRow
4+
This module is kept for backwards compatibility. New code should use
5+
DatasetLoggerAdapter from dataset_logger_adapter.py instead.
6+
"""
107

8+
from eval_protocol.dataset_logger.dataset_logger_adapter import DatasetLoggerAdapter
119

12-
class SqliteDatasetLoggerAdapter(DatasetLogger):
13-
def __init__(self, db_path: Optional[str] = None, store: Optional[SqliteEvaluationRowStore] = None):
14-
eval_protocol_dir = find_eval_protocol_dir()
15-
if db_path is not None and store is not None:
16-
raise ValueError("Provide only one of db_path or store, not both.")
17-
if store is not None:
18-
self.db_path = store.db_path
19-
self._store = store
20-
else:
21-
self.db_path = db_path if db_path is not None else os.path.join(eval_protocol_dir, "logs.db")
22-
self._store = SqliteEvaluationRowStore(self.db_path)
10+
# Backwards-compatible alias
11+
SqliteDatasetLoggerAdapter = DatasetLoggerAdapter
2312

24-
def log(self, row: "EvaluationRow") -> None:
25-
data = row.model_dump(exclude_none=True, mode="json")
26-
rollout_id = data.get("execution_metadata", {}).get("rollout_id", "unknown")
27-
logger.debug(f"[EVENT_BUS_EMIT] Starting to log row with rollout_id: {rollout_id}")
28-
29-
self._store.upsert_row(data=data)
30-
logger.debug(f"[EVENT_BUS_EMIT] Successfully stored row in database for rollout_id: {rollout_id}")
31-
32-
try:
33-
logger.debug(f"[EVENT_BUS_EMIT] Emitting event '{LOG_EVENT_TYPE}' for rollout_id: {rollout_id}")
34-
event_bus.emit(LOG_EVENT_TYPE, EvaluationRow(**data))
35-
logger.debug(f"[EVENT_BUS_EMIT] Successfully emitted event for rollout_id: {rollout_id}")
36-
except Exception as e:
37-
# Avoid breaking storage due to event emission issues
38-
logger.error(f"[EVENT_BUS_EMIT] Failed to emit row_upserted event for rollout_id {rollout_id}: {e}")
39-
pass
40-
41-
def read(self, rollout_id: Optional[str] = None) -> List["EvaluationRow"]:
42-
from eval_protocol.models import EvaluationRow
43-
44-
results = self._store.read_rows(rollout_id=rollout_id)
45-
return [EvaluationRow(**data) for data in results]
13+
__all__ = ["SqliteDatasetLoggerAdapter"]

eval_protocol/dataset_logger/sqlite_evaluation_row_store.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,29 @@
11
import os
22
from typing import List, Optional
33

4-
from peewee import CharField, Model, SqliteDatabase
5-
from playhouse.sqlite_ext import JSONField
4+
try:
5+
from peewee import CharField, Model, SqliteDatabase
6+
from playhouse.sqlite_ext import JSONField
7+
except ImportError:
8+
raise ImportError(
9+
"SQLite storage backend requires 'peewee' package. Install it with: pip install eval-protocol[sqlite_storage]"
10+
)
611

7-
from eval_protocol.models import EvaluationRow
12+
from eval_protocol.dataset_logger.evaluation_row_store import EvaluationRowStore
813

914

10-
class SqliteEvaluationRowStore:
15+
class SqliteEvaluationRowStore(EvaluationRowStore):
1116
"""
1217
Lightweight reusable SQLite store for evaluation rows.
1318
1419
Stores arbitrary row data as JSON keyed by a unique string `rollout_id`.
1520
"""
1621

1722
def __init__(self, db_path: str):
18-
os.makedirs(os.path.dirname(db_path), exist_ok=True)
23+
# Handle case where db_path might be in the root directory
24+
db_dir = os.path.dirname(db_path)
25+
if db_dir:
26+
os.makedirs(db_dir, exist_ok=True)
1927
self._db_path = db_path
2028
self._db = SqliteDatabase(self._db_path, pragmas={"journal_mode": "wal"})
2129

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import os
2+
from typing import List, Optional
3+
4+
from tinydb import Query, TinyDB
5+
6+
from eval_protocol.dataset_logger.evaluation_row_store import EvaluationRowStore
7+
8+
9+
class TinyDBEvaluationRowStore(EvaluationRowStore):
10+
"""
11+
TinyDB-based evaluation row store.
12+
13+
Stores data as plain JSON files, which are human-readable and
14+
don't suffer from SQLite's binary format corruption issues.
15+
"""
16+
17+
def __init__(self, db_path: str):
18+
# Handle case where db_path might be in the root directory
19+
db_dir = os.path.dirname(db_path)
20+
if db_dir:
21+
os.makedirs(db_dir, exist_ok=True)
22+
self._db_path = db_path
23+
self._db = TinyDB(db_path)
24+
self._table = self._db.table("evaluation_rows")
25+
26+
@property
27+
def db_path(self) -> str:
28+
return self._db_path
29+
30+
def upsert_row(self, data: dict) -> None:
31+
rollout_id = data["execution_metadata"]["rollout_id"]
32+
if rollout_id is None:
33+
raise ValueError("execution_metadata.rollout_id is required to upsert a row")
34+
35+
Row = Query()
36+
self._table.upsert(data, Row.execution_metadata.rollout_id == rollout_id)
37+
38+
def read_rows(self, rollout_id: Optional[str] = None) -> List[dict]:
39+
if rollout_id is not None:
40+
Row = Query()
41+
return list(self._table.search(Row.execution_metadata.rollout_id == rollout_id))
42+
return list(self._table.all())
43+
44+
def delete_row(self, rollout_id: str) -> int:
45+
Row = Query()
46+
removed = self._table.remove(Row.execution_metadata.rollout_id == rollout_id)
47+
return len(removed)
48+
49+
def delete_all_rows(self) -> int:
50+
count = len(self._table)
51+
self._table.truncate()
52+
return count

eval_protocol/event_bus/__init__.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,47 @@
1-
# Global event bus instance - uses SqliteEventBus for cross-process functionality
1+
# Global event bus instance - uses configured storage backend for cross-process functionality
2+
import os
23
from typing import Any, Callable
4+
35
from eval_protocol.event_bus.event_bus import EventBus
6+
from eval_protocol.event_bus.event_bus_database import EventBusDatabase
7+
8+
9+
def get_event_bus_database(db_path: str) -> EventBusDatabase:
10+
"""
11+
Factory to get the configured event bus database backend.
12+
13+
Uses EP_STORAGE environment variable to select backend:
14+
- "tinydb" (default): Uses TinyDB with JSON file storage
15+
- "sqlite": Uses SQLite with peewee ORM
16+
17+
Args:
18+
db_path: Path to the database file
19+
20+
Returns:
21+
EventBusDatabase implementation
22+
"""
23+
storage_type = os.getenv("EP_STORAGE", "tinydb").lower()
24+
25+
if storage_type == "sqlite":
26+
from eval_protocol.event_bus.sqlite_event_bus_database import SqliteEventBusDatabase
27+
28+
return SqliteEventBusDatabase(db_path)
29+
else:
30+
from eval_protocol.event_bus.tinydb_event_bus_database import TinyDBEventBusDatabase
31+
32+
return TinyDBEventBusDatabase(db_path)
33+
34+
35+
def _get_default_db_filename() -> str:
36+
"""Get the default database filename based on storage backend."""
37+
storage_type = os.getenv("EP_STORAGE", "tinydb").lower()
38+
return "events.db" if storage_type == "sqlite" else "events.json"
439

540

641
def _get_default_event_bus():
7-
from eval_protocol.event_bus.sqlite_event_bus import SqliteEventBus
42+
from eval_protocol.event_bus.cross_process_event_bus import CrossProcessEventBus
843

9-
return SqliteEventBus()
44+
return CrossProcessEventBus()
1045

1146

1247
# Lazy property that creates the event bus only when accessed

0 commit comments

Comments
 (0)