From dbfb7048259d7e216811ae2da977a77a9898c44c Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Wed, 13 May 2026 14:59:32 +0100 Subject: [PATCH] Fix a flaky timeseries test --- dimos/memory/timeseries/test_base.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/dimos/memory/timeseries/test_base.py b/dimos/memory/timeseries/test_base.py index 9491d2c93c..61fbdefdfa 100644 --- a/dimos/memory/timeseries/test_base.py +++ b/dimos/memory/timeseries/test_base.py @@ -19,6 +19,7 @@ import uuid import pytest +from reactivex import operators as ops from dimos.memory.timeseries.base import TimeSeriesStore from dimos.memory.timeseries.inmemory import InMemoryStore @@ -363,21 +364,12 @@ def test_iterate_items(self, store_factory, store_name, temp_dir): assert len(items) == 2 assert items[0] == (2.0, SampleData("b", 2.0)) - def test_stream_basic(self, store_factory, store_name, temp_dir): + async def test_stream_basic(self, store_factory, store_name, temp_dir): store = store_factory(temp_dir) store.save(SampleData("a", 1.0), SampleData("b", 2.0), SampleData("c", 3.0)) # Stream at high speed (essentially instant) - results: list[SampleData] = [] - store.stream(speed=1000.0).subscribe( - on_next=results.append, - on_completed=lambda: None, - ) - - # Give it a moment to complete - import time - - time.sleep(0.1) + results = await store.stream(speed=1000.0).pipe(ops.to_list()) assert results == [ SampleData("a", 1.0),