Skip to content

Commit 3ffc83a

Browse files
author
Dylan Huang
committed
Update default database filenames and enhance TinyDB event handling
- Changed default database filenames from "events.db" to "logs.db" and "events.json" to "logs.json" based on storage backend. - Added cache clearing and table reloading in TinyDBEventBusDatabase to ensure fresh reads of unprocessed events and improve cleanup accuracy.
1 parent 9b5e353 commit 3ffc83a

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

eval_protocol/event_bus/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def get_event_bus_database(db_path: str) -> EventBusDatabase:
3535
def _get_default_db_filename() -> str:
3636
"""Get the default database filename based on storage backend."""
3737
storage_type = os.getenv("EP_STORAGE", "tinydb").lower()
38-
return "events.db" if storage_type == "sqlite" else "events.json"
38+
return "logs.db" if storage_type == "sqlite" else "logs.json"
3939

4040

4141
def _get_default_event_bus():

eval_protocol/event_bus/tinydb_event_bus_database.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,19 @@ def publish_event(self, event_type: str, data: Any, process_id: str) -> None:
5151
def get_unprocessed_events(self, process_id: str) -> List[dict]:
5252
"""Get unprocessed events from other processes."""
5353
try:
54+
# Clear query cache to force fresh read from disk
55+
# TinyDB caches query results, so we need to clear cache to see
56+
# events written by other processes. The search() method will
57+
# automatically call _read_table() on a cache miss.
58+
self._table.clear_cache()
59+
5460
Event = Query()
5561
results = self._table.search((Event.process_id != process_id) & (Event.processed == False)) # noqa: E712
5662

63+
logger.debug(
64+
f"TinyDBEventBusDatabase: Found {len(results)} unprocessed events for process_id: {process_id} in database: {self._db_path}"
65+
)
66+
5767
events = []
5868
# Sort by timestamp
5969
for event in sorted(results, key=lambda x: x.get("timestamp", 0)):
@@ -83,6 +93,9 @@ def mark_event_processed(self, event_id: str) -> None:
8393
def cleanup_old_events(self, max_age_hours: int = 24) -> None:
8494
"""Clean up old processed events."""
8595
try:
96+
# Reload table from disk to see latest data before cleanup
97+
self._table._read_table()
98+
8699
cutoff_time = time.time() - (max_age_hours * 3600)
87100
Event = Query()
88101
self._table.remove((Event.processed == True) & (Event.timestamp < cutoff_time)) # noqa: E712

0 commit comments

Comments
 (0)