Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions ovoscope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ovos_bus_client.session import SessionManager, Session
from ovos_config.config import Configuration
from ovos_config.models import LocalConf
from ovos_core.intent_services import IntentService

Check failure on line 12 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F401)

ovoscope/__init__.py:12:39: F401 `ovos_core.intent_services.IntentService` imported but unused help: Remove unused import: `ovos_core.intent_services.IntentService`
from ovos_core.skill_manager import SkillManager
from ovos_plugin_manager.skills import find_skill_plugins
from ovos_utils.fakebus import FakeBus
Expand Down Expand Up @@ -656,9 +656,16 @@
async_responses: List[Message] = dataclasses.field(default_factory=list)

eof_msgs: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_EOF)
# end capture only after an eof message has been seen this many times. Use >1
# when the scenario produces N concurrent lifecycles that each terminate on the
# same eof topic (e.g. two ovos.utterance.handled — one per utterance — when a
# stop interrupts a running skill), so capture spans all of them.
eof_count: int = 1
ignore_messages: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_IGNORED)
async_messages: List[str] = dataclasses.field(default_factory=list) # these come from an external thread and might come in any order
done: threading.Event = dataclasses.field(default_factory=lambda: threading.Event())
_eof_lock: threading.Lock = dataclasses.field(default_factory=lambda: threading.Lock())
_eof_seen: int = 0

def handle_message(self, msg: str):
if self.done.is_set():
Expand All @@ -670,7 +677,10 @@
self.responses.append(msg)

def handle_end_of_test(self, msg: Message):
self.done.set()
with self._eof_lock:
self._eof_seen += 1
if self._eof_seen >= self.eof_count:
self.done.set()

def __post_init__(self):
self.minicroft.bus.on("message", self.handle_message)
Expand All @@ -680,6 +690,8 @@
def capture(self, source_message: Message, timeout=20):
test_message = deepcopy(source_message) # ensure object not mutated by ovos-core
self.done.clear()
with self._eof_lock:
self._eof_seen = 0
self.minicroft.bus.emit(test_message)
self.done.wait(timeout)

Expand Down Expand Up @@ -709,7 +721,16 @@
# message type runtime modifiers
##############################
eof_msgs: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_EOF) # if received, end message capture
eof_count: int = 1 # end capture only after an eof message has been seen this many times (one per concurrent lifecycle terminating on the same topic)
ignore_messages: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_IGNORED) # pretend any message in this list was not emitted for testing purposes
# Assert only the messages belonging to a single dispatch lifecycle, identified
# by message.context["skill_id"]. When set, captured messages whose skill_id does
# not match are dropped before assertion. This isolates one lifecycle's §8 trio +
# §9 terminals from a CONCURRENT lifecycle whose messages interleave
# non-deterministically (e.g. stopping a skill that is mid-dispatch: the stop
# dispatch and the interrupted skill's own completion race). Run the same
# scenario once per skill_id to assert each lifecycle deterministically.
skill_id: Optional[str] = None
ignore_gui: bool = True # ignore the gui namespace bus messages, usually unwanted unless explicitly testing gui integration
async_messages: List[str] = dataclasses.field(default_factory=list) # these come from an external thread and might come in any order, validate they are received outside the main test

Expand Down Expand Up @@ -751,7 +772,7 @@
###########################
track_bus_coverage: bool = False # enable BusCoverageTracker for this test
print_bus_coverage: bool = False # print inline summary after execute()
bus_coverage_report: Optional["BusCoverageReport"] = dataclasses.field(default=None, init=False, repr=False)

Check failure on line 775 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F821)

ovoscope/__init__.py:775:36: F821 Undefined name `BusCoverageReport`

###########################
# test runner internals
Expand Down Expand Up @@ -823,6 +844,7 @@
# the capture session will store all messages until capture.finish()
# even if multiple messages are emitted
capture = CaptureSession(self.minicroft, eof_msgs=self.eof_msgs,
eof_count=self.eof_count,
ignore_messages=self.ignore_messages,
async_messages=self.async_messages)
for idx, source_message in enumerate(self.source_message):
Expand All @@ -834,6 +856,14 @@
# final message list
messages = capture.finish()

# isolate a single dispatch lifecycle by skill_id — drop messages from a
# concurrent (interleaving) lifecycle so the assertion is deterministic.
if self.skill_id is not None:
messages = [m for m in messages
if (m.context or {}).get("skill_id") == self.skill_id]
if self.verbose:
print(f"💡 filtered to skill_id='{self.skill_id}': {len(messages)} messages")

if _bus_tracker is not None:
_bus_tracker.stop_tracking()
all_responses = messages + list(getattr(capture, "async_responses", []))
Expand Down Expand Up @@ -907,7 +937,7 @@
assert received.context[k] == v, f"❌ message context mismatch for key '{k}' - expected '{v}' | got '{received.context[k]}'"
if self.verbose:
print(f"✅ got expected message context '{k}: '{v}'")
if self.test_routing:
if self.test_routing and self.skill_id is None:
r_src = received.context.get("source")
r_dst = received.context.get("destination")
if expected.msg_type in self.keep_original_src:
Expand Down Expand Up @@ -952,14 +982,14 @@
if self.verbose:
print(f"💡 final session: {last_sess.serialize()}")
print(f"> expected: {expected_sess.serialize()}")
assert {s[0] for s in last_sess.active_skills} == {s[0] for s in expected_sess.active_skills}, f"❌ final session active_skills doesn't match"

Check failure on line 985 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:985:108: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.lang == expected_sess.lang, f"❌ final session lang doesn't match"

Check failure on line 986 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:986:53: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.pipeline == expected_sess.pipeline, f"❌ final session pipeline doesn't match"

Check failure on line 987 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:987:61: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.system_unit == expected_sess.system_unit, f"❌ final session system_unit doesn't match"

Check failure on line 988 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:988:67: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.date_format == expected_sess.date_format, f"❌ final session date_format doesn't match"

Check failure on line 989 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:989:67: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.time_format == expected_sess.time_format, f"❌ final session time_format doesn't match"

Check failure on line 990 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:990:67: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.site_id == expected_sess.site_id, f"❌ final session site_id doesn't match"

Check failure on line 991 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:991:59: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert sess.session_id == expected_sess.session_id, f"❌ final session session_id doesn't match"

Check failure on line 992 in ovoscope/__init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F541)

ovoscope/__init__.py:992:65: F541 f-string without any placeholders help: Remove extraneous `f` prefix
assert set(sess.blacklisted_skills or []) == set(expected_sess.blacklisted_skills or []), f"❌ final session blacklisted_skills doesn't match"
assert set(sess.blacklisted_intents or []) == set(expected_sess.blacklisted_intents or []), f"❌ final session blacklisted_intents doesn't match"
if self.verbose:
Expand Down
44 changes: 44 additions & 0 deletions test/unittests/test_capture_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,50 @@ def test_multiple_eof_types(self):
self.assertIn("test.x", types)
self.assertNotIn("test.late", types)

def test_eof_count_waits_for_n_occurrences(self):
"""With eof_count>1, capture continues until an eof topic is seen that
many times — for scenarios with N concurrent lifecycles each terminating
on the same eof topic."""
cs = CaptureSession(self.mc,
eof_msgs=["test.eof"],
eof_count=2,
ignore_messages=[])
self._emit_after(0.05, Message("test.eof")) # 1st eof — must NOT stop
self._emit_after(0.10, Message("test.between")) # captured (after 1st eof)
self._emit_after(0.15, Message("test.eof")) # 2nd eof — stops capture
self._emit_after(0.30, Message("test.after")) # must NOT appear

cs.capture(Message("test.trigger"), timeout=3)
msgs = cs.finish()
types = [m.msg_type for m in msgs]

self.assertIn("test.between", types,
"a message between the 1st and 2nd eof must be captured")
self.assertEqual(types.count("test.eof"), 2,
"both eof occurrences are captured")
self.assertNotIn("test.after", types,
"message after the Nth eof must not be captured")

def test_eof_count_resets_between_captures(self):
"""The eof counter resets per capture() call so eof_count applies fresh."""
cs = CaptureSession(self.mc,
eof_msgs=["test.eof"],
eof_count=2,
ignore_messages=[])
self._emit_after(0.05, Message("test.eof"))
self._emit_after(0.10, Message("test.eof"))
cs.capture(Message("test.trigger1"), timeout=3)
cs.finish()
# a second capture must again require 2 eofs, not be already-done
cs2 = CaptureSession(self.mc, eof_msgs=["test.eof"], eof_count=2,
ignore_messages=[])
self._emit_after(0.05, Message("test.eof"))
self._emit_after(0.10, Message("test.mid"))
self._emit_after(0.15, Message("test.eof"))
cs2.capture(Message("test.trigger2"), timeout=3)
types = [m.msg_type for m in cs2.finish()]
self.assertIn("test.mid", types)

Comment on lines +188 to +207

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Reuse the same CaptureSession for the second capture.

This only proves that a new instance starts clean. If CaptureSession.capture() stopped resetting _eof_seen, this test would still pass because cs2 gets a fresh counter and fresh handlers.

Suggested change
     def test_eof_count_resets_between_captures(self):
         """The eof counter resets per capture() call so eof_count applies fresh."""
         cs = CaptureSession(self.mc,
                             eof_msgs=["test.eof"],
                             eof_count=2,
                             ignore_messages=[])
         self._emit_after(0.05, Message("test.eof"))
         self._emit_after(0.10, Message("test.eof"))
         cs.capture(Message("test.trigger1"), timeout=3)
-        cs.finish()
-        # a second capture must again require 2 eofs, not be already-done
-        cs2 = CaptureSession(self.mc, eof_msgs=["test.eof"], eof_count=2,
-                             ignore_messages=[])
+        first_len = len(cs.responses)
+        # a second capture on the same session must again require 2 eofs
         self._emit_after(0.05, Message("test.eof"))
         self._emit_after(0.10, Message("test.mid"))
         self._emit_after(0.15, Message("test.eof"))
-        cs2.capture(Message("test.trigger2"), timeout=3)
-        types = [m.msg_type for m in cs2.finish()]
+        cs.capture(Message("test.trigger2"), timeout=3)
+        types = [m.msg_type for m in cs.finish()[first_len:]]
         self.assertIn("test.mid", types)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_eof_count_resets_between_captures(self):
"""The eof counter resets per capture() call so eof_count applies fresh."""
cs = CaptureSession(self.mc,
eof_msgs=["test.eof"],
eof_count=2,
ignore_messages=[])
self._emit_after(0.05, Message("test.eof"))
self._emit_after(0.10, Message("test.eof"))
cs.capture(Message("test.trigger1"), timeout=3)
cs.finish()
# a second capture must again require 2 eofs, not be already-done
cs2 = CaptureSession(self.mc, eof_msgs=["test.eof"], eof_count=2,
ignore_messages=[])
self._emit_after(0.05, Message("test.eof"))
self._emit_after(0.10, Message("test.mid"))
self._emit_after(0.15, Message("test.eof"))
cs2.capture(Message("test.trigger2"), timeout=3)
types = [m.msg_type for m in cs2.finish()]
self.assertIn("test.mid", types)
def test_eof_count_resets_between_captures(self):
"""The eof counter resets per capture() call so eof_count applies fresh."""
cs = CaptureSession(self.mc,
eof_msgs=["test.eof"],
eof_count=2,
ignore_messages=[])
self._emit_after(0.05, Message("test.eof"))
self._emit_after(0.10, Message("test.eof"))
cs.capture(Message("test.trigger1"), timeout=3)
first_len = len(cs.responses)
# a second capture on the same session must again require 2 eofs
self._emit_after(0.05, Message("test.eof"))
self._emit_after(0.10, Message("test.mid"))
self._emit_after(0.15, Message("test.eof"))
cs.capture(Message("test.trigger2"), timeout=3)
types = [m.msg_type for m in cs.finish()[first_len:]]
self.assertIn("test.mid", types)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@test/unittests/test_capture_session.py` around lines 188 - 207, The test
currently creates a new CaptureSession for the second capture, so it does not
verify that CaptureSession.capture() resets EOF state on reuse. Update the test
to use the same CaptureSession instance for both capture() calls, then confirm
the second run still waits for the full eof_count; reference
CaptureSession.capture() and the _eof_seen state it should reset between
captures.

def test_capture_timeout_returns_partial_results(self):
"""If the EOF never fires, capture() must return after the timeout
and finish() must still return whatever was captured."""
Expand Down
88 changes: 88 additions & 0 deletions test/unittests/test_end2end_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ def handle_async(self, message: Message):
self.bus.emit(Message("ovos.utterance.handled", context=message.context))


class TwoLifecycleSkill(OVOSSkill):
"""Emits two lifecycles tagged with distinct context skill_ids, to exercise
the End2EndTest ``skill_id`` filter. Each lifecycle ends on the shared
``ovos.utterance.handled`` topic (so eof_count=2 spans both)."""

def initialize(self):
self.add_event("unittest.two_lifecycles", self.handle_two)

def handle_two(self, message: Message):
for sid in ("life.a", "life.b"):
ctx = dict(message.context)
ctx["skill_id"] = sid
self.bus.emit(Message(f"{sid}.step", context=ctx))
self.bus.emit(Message("ovos.utterance.handled", context=ctx))


def _session(sid="ext-test", pipeline=None):
s = Session(sid)
s.lang = "en-US"
Expand Down Expand Up @@ -983,5 +999,77 @@ def test_count_mismatch_prints_first_differing(self):
test.execute(timeout=10)


class TestSkillIdFilter(unittest.TestCase):
"""The skill_id filter isolates one dispatch lifecycle from concurrent ones."""

def setUp(self):
LOG.set_level("ERROR")
self.mc = get_minicroft([SKILL_ID],
extra_skills={SKILL_ID: TwoLifecycleSkill})

def tearDown(self):
self.mc.stop()
LOG.set_level("CRITICAL")

def _common_kwargs(self):
return dict(
minicroft=self.mc,
skill_ids=[SKILL_ID],
eof_msgs=["ovos.utterance.handled"],
eof_count=2, # both lifecycles terminate on ovos.utterance.handled
test_routing=False,
test_active_skills=False,
test_final_session=False,
ignore_messages=DEFAULT_IGNORED + HANDLER_LIFECYCLE,
verbose=False,
)
Comment on lines +1014 to +1025

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Keep routing enabled in the filtered cases.

The new contract is that execute() suppresses routing assertions when skill_id is set, but _common_kwargs() disables routing for every case up front. That means the filtered tests never exercise the new guard, so a regression there would still pass.

Suggested change
     def _common_kwargs(self):
         return dict(
             minicroft=self.mc,
             skill_ids=[SKILL_ID],
             eof_msgs=["ovos.utterance.handled"],
             eof_count=2,  # both lifecycles terminate on ovos.utterance.handled
-            test_routing=False,
             test_active_skills=False,
             test_final_session=False,
             ignore_messages=DEFAULT_IGNORED + HANDLER_LIFECYCLE,
             verbose=False,
         )
@@
         test = End2EndTest(
             source_message=src,
             expected_messages=[src],
             test_message_number=False,
             test_msg_type=False,
             test_msg_data=False,
             test_msg_context=False,
+            test_routing=False,
             **self._common_kwargs(),
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@test/unittests/test_end2end_extended.py` around lines 1014 - 1025, The
filtered end-to-end cases are disabling routing too early in _common_kwargs, so
they never cover the new execute() guard when skill_id is set. Update
_common_kwargs in test_end2end_extended.py to keep routing enabled by default
and only let execute() suppress routing assertions when skill_id is present, so
the filtered tests still exercise that branch.


def test_filter_isolates_one_lifecycle(self):
"""Only messages whose context skill_id matches are asserted."""
src = _make_custom("unittest.two_lifecycles")
test = End2EndTest(
source_message=src,
skill_id="life.a",
expected_messages=[
Message("life.a.step", {}, {"skill_id": "life.a"}),
Message("ovos.utterance.handled", {}, {"skill_id": "life.a"}),
],
**self._common_kwargs(),
)
# passes only if life.b.* and the source (no skill_id) are filtered out
test.execute(timeout=10)

def test_filter_the_other_lifecycle(self):
"""The same scenario, filtered to the other skill_id."""
src = _make_custom("unittest.two_lifecycles")
test = End2EndTest(
source_message=src,
skill_id="life.b",
expected_messages=[
Message("life.b.step", {}, {"skill_id": "life.b"}),
Message("ovos.utterance.handled", {}, {"skill_id": "life.b"}),
],
**self._common_kwargs(),
)
test.execute(timeout=10)

def test_unfiltered_sees_both_lifecycles(self):
"""Without the filter, eof_count=2 captures both lifecycles' messages."""
src = _make_custom("unittest.two_lifecycles")
test = End2EndTest(
source_message=src,
expected_messages=[src],
test_message_number=False,
test_msg_type=False,
test_msg_data=False,
test_msg_context=False,
**self._common_kwargs(),
)
result = test.execute(timeout=10)
types = [m.msg_type for m in result]
self.assertIn("life.a.step", types)
self.assertIn("life.b.step", types)


if __name__ == "__main__":
unittest.main()
Loading