diff --git a/dimos/memory/timeseries/test_base.py b/dimos/memory/timeseries/test_base.py index 9491d2c93c..61fbdefdfa 100644 --- a/dimos/memory/timeseries/test_base.py +++ b/dimos/memory/timeseries/test_base.py @@ -19,6 +19,7 @@ import uuid import pytest +from reactivex import operators as ops from dimos.memory.timeseries.base import TimeSeriesStore from dimos.memory.timeseries.inmemory import InMemoryStore @@ -363,21 +364,12 @@ def test_iterate_items(self, store_factory, store_name, temp_dir): assert len(items) == 2 assert items[0] == (2.0, SampleData("b", 2.0)) - def test_stream_basic(self, store_factory, store_name, temp_dir): + async def test_stream_basic(self, store_factory, store_name, temp_dir): store = store_factory(temp_dir) store.save(SampleData("a", 1.0), SampleData("b", 2.0), SampleData("c", 3.0)) # Stream at high speed (essentially instant) - results: list[SampleData] = [] - store.stream(speed=1000.0).subscribe( - on_next=results.append, - on_completed=lambda: None, - ) - - # Give it a moment to complete - import time - - time.sleep(0.1) + results = await store.stream(speed=1000.0).pipe(ops.to_list()) assert results == [ SampleData("a", 1.0),