Skip to content

Commit 4ae1242

Browse files
author
Dylan Huang
committed
Add cross-process cache invalidation tests for evaluation row store and event bus database
- Introduced `TestCrossProcessCacheInvalidation` to verify that query caches are properly invalidated when another process modifies the database. - Added tests to ensure that both evaluation row stores and event bus databases can read fresh data and updates from other processes. - Implemented scenarios for both reading and writing operations across multiple simulated processes to validate cache behavior.
1 parent fd113c4 commit 4ae1242

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed

tests/test_storage_backends.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,170 @@ def test_get_event_bus_database_sqlite(self, temp_dir: str, monkeypatch):
282282
assert isinstance(db, SqliteEventBusDatabase)
283283

284284

285+
class TestCrossProcessCacheInvalidation:
286+
"""
287+
Tests that query cache is properly invalidated when another process modifies the database.
288+
289+
This simulates cross-process scenarios by creating separate store instances
290+
pointing to the same database file. Each instance represents a different "process"
291+
that might have cached query results.
292+
"""
293+
294+
@pytest.mark.parametrize(
295+
"store_class,file_ext",
296+
[
297+
(TinyDBEvaluationRowStore, ".json"),
298+
(SqliteEvaluationRowStore, ".db"),
299+
],
300+
)
301+
def test_evaluation_row_store_sees_writes_from_other_process(self, temp_dir: str, store_class, file_ext: str):
302+
"""
303+
Ensure a store instance can read fresh data written by another instance.
304+
305+
This verifies that cached query results don't prevent seeing new data
306+
written by a separate process.
307+
"""
308+
db_path = os.path.join(temp_dir, f"test{file_ext}")
309+
310+
# Simulate two processes with separate store instances
311+
process1_store = store_class(db_path)
312+
process2_store = store_class(db_path)
313+
314+
# Process 1 reads initially (may cache empty result)
315+
initial_rows = process1_store.read_rows()
316+
assert len(initial_rows) == 0
317+
318+
# Process 2 writes new data
319+
data = {
320+
"execution_metadata": {"rollout_id": "cross-process-test"},
321+
"input_metadata": {"row_id": "row-from-process-2"},
322+
"messages": [{"role": "user", "content": "Hello from process 2"}],
323+
}
324+
process2_store.upsert_row(data)
325+
326+
# Process 1 should see the new data (cache should be invalidated/bypassed)
327+
rows = process1_store.read_rows()
328+
assert len(rows) == 1
329+
assert rows[0]["execution_metadata"]["rollout_id"] == "cross-process-test"
330+
assert rows[0]["input_metadata"]["row_id"] == "row-from-process-2"
331+
332+
@pytest.mark.parametrize(
333+
"store_class,file_ext",
334+
[
335+
(TinyDBEvaluationRowStore, ".json"),
336+
(SqliteEvaluationRowStore, ".db"),
337+
],
338+
)
339+
def test_evaluation_row_store_sees_updates_from_other_process(self, temp_dir: str, store_class, file_ext: str):
340+
"""
341+
Ensure a store instance sees updates made by another instance.
342+
343+
This verifies that cached query results are properly invalidated
344+
when another process updates existing data.
345+
"""
346+
db_path = os.path.join(temp_dir, f"test{file_ext}")
347+
348+
# Both processes start with the same initial data
349+
process1_store = store_class(db_path)
350+
initial_data = {
351+
"execution_metadata": {"rollout_id": "shared-rollout"},
352+
"input_metadata": {"row_id": "initial-row"},
353+
"value": "initial",
354+
}
355+
process1_store.upsert_row(initial_data)
356+
357+
# Process 2 opens the same database
358+
process2_store = store_class(db_path)
359+
360+
# Process 1 reads and potentially caches the result
361+
rows = process1_store.read_rows(rollout_id="shared-rollout")
362+
assert len(rows) == 1
363+
assert rows[0]["value"] == "initial"
364+
365+
# Process 2 updates the data
366+
updated_data = {
367+
"execution_metadata": {"rollout_id": "shared-rollout"},
368+
"input_metadata": {"row_id": "updated-row"},
369+
"value": "updated-by-process-2",
370+
}
371+
process2_store.upsert_row(updated_data)
372+
373+
# Process 1 should see the updated data
374+
rows = process1_store.read_rows(rollout_id="shared-rollout")
375+
assert len(rows) == 1
376+
assert rows[0]["value"] == "updated-by-process-2"
377+
assert rows[0]["input_metadata"]["row_id"] == "updated-row"
378+
379+
@pytest.mark.parametrize(
380+
"db_class,file_ext",
381+
[
382+
(TinyDBEventBusDatabase, ".json"),
383+
(SqliteEventBusDatabase, ".db"),
384+
],
385+
)
386+
def test_event_bus_database_sees_events_from_other_process(self, temp_dir: str, db_class, file_ext: str):
387+
"""
388+
Ensure an event bus instance can read events published by another instance.
389+
390+
This verifies that cached query results don't prevent seeing new events
391+
written by a separate process.
392+
"""
393+
db_path = os.path.join(temp_dir, f"events{file_ext}")
394+
395+
# Simulate two processes with separate event bus instances
396+
process1_db = db_class(db_path)
397+
process2_db = db_class(db_path)
398+
399+
# Process 1 checks for events initially (may cache empty result)
400+
initial_events = process1_db.get_unprocessed_events("process-1")
401+
assert len(initial_events) == 0
402+
403+
# Process 2 publishes an event
404+
process2_db.publish_event("test_event", {"key": "value"}, "process-2")
405+
406+
# Process 1 should see the new event (cache should be invalidated/bypassed)
407+
events = process1_db.get_unprocessed_events("process-1")
408+
assert len(events) == 1
409+
assert events[0]["event_type"] == "test_event"
410+
assert events[0]["data"] == {"key": "value"}
411+
assert events[0]["process_id"] == "process-2"
412+
413+
@pytest.mark.parametrize(
414+
"db_class,file_ext",
415+
[
416+
(TinyDBEventBusDatabase, ".json"),
417+
(SqliteEventBusDatabase, ".db"),
418+
],
419+
)
420+
def test_event_bus_database_sees_processed_status_from_other_process(self, temp_dir: str, db_class, file_ext: str):
421+
"""
422+
Ensure an event bus instance sees when another instance marks events as processed.
423+
424+
This verifies that cached query results are properly invalidated
425+
when another process updates event status.
426+
"""
427+
db_path = os.path.join(temp_dir, f"events{file_ext}")
428+
429+
# Process 1 publishes an event
430+
process1_db = db_class(db_path)
431+
process1_db.publish_event("test_event", {"key": "value"}, "process-1")
432+
433+
# Process 2 opens the same database and sees the event
434+
process2_db = db_class(db_path)
435+
events = process2_db.get_unprocessed_events("process-2")
436+
assert len(events) == 1
437+
438+
# Process 3 opens and marks the event as processed
439+
process3_db = db_class(db_path)
440+
events = process3_db.get_unprocessed_events("process-3")
441+
assert len(events) == 1
442+
process3_db.mark_event_processed(events[0]["event_id"])
443+
444+
# Process 2 should no longer see the event (it's been processed)
445+
events = process2_db.get_unprocessed_events("process-2")
446+
assert len(events) == 0
447+
448+
285449
class TestBackwardsCompatibility:
286450
"""Tests for backwards compatibility aliases."""
287451

0 commit comments

Comments
 (0)