diff --git a/ovoscope/__init__.py b/ovoscope/__init__.py index fb2d48c..9dd6688 100644 --- a/ovoscope/__init__.py +++ b/ovoscope/__init__.py @@ -656,9 +656,16 @@ class CaptureSession: async_responses: List[Message] = dataclasses.field(default_factory=list) eof_msgs: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_EOF) + # end capture only after an eof message has been seen this many times. Use >1 + # when the scenario produces N concurrent lifecycles that each terminate on the + # same eof topic (e.g. two ovos.utterance.handled — one per utterance — when a + # stop interrupts a running skill), so capture spans all of them. + eof_count: int = 1 ignore_messages: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_IGNORED) async_messages: List[str] = dataclasses.field(default_factory=list) # these come from an external thread and might come in any order done: threading.Event = dataclasses.field(default_factory=lambda: threading.Event()) + _eof_lock: threading.Lock = dataclasses.field(default_factory=lambda: threading.Lock()) + _eof_seen: int = 0 def handle_message(self, msg: str): if self.done.is_set(): @@ -670,7 +677,10 @@ def handle_message(self, msg: str): self.responses.append(msg) def handle_end_of_test(self, msg: Message): - self.done.set() + with self._eof_lock: + self._eof_seen += 1 + if self._eof_seen >= self.eof_count: + self.done.set() def __post_init__(self): self.minicroft.bus.on("message", self.handle_message) @@ -680,6 +690,8 @@ def __post_init__(self): def capture(self, source_message: Message, timeout=20): test_message = deepcopy(source_message) # ensure object not mutated by ovos-core self.done.clear() + with self._eof_lock: + self._eof_seen = 0 self.minicroft.bus.emit(test_message) self.done.wait(timeout) @@ -709,7 +721,16 @@ class End2EndTest: # message type runtime modifiers ############################## eof_msgs: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_EOF) # if received, end message capture + eof_count: int = 1 # end capture only after an eof message has been seen this many times (one per concurrent lifecycle terminating on the same topic) ignore_messages: List[str] = dataclasses.field(default_factory=lambda: DEFAULT_IGNORED) # pretend any message in this list was not emitted for testing purposes + # Assert only the messages belonging to a single dispatch lifecycle, identified + # by message.context["skill_id"]. When set, captured messages whose skill_id does + # not match are dropped before assertion. This isolates one lifecycle's §8 trio + + # §9 terminals from a CONCURRENT lifecycle whose messages interleave + # non-deterministically (e.g. stopping a skill that is mid-dispatch: the stop + # dispatch and the interrupted skill's own completion race). Run the same + # scenario once per skill_id to assert each lifecycle deterministically. + skill_id: Optional[str] = None ignore_gui: bool = True # ignore the gui namespace bus messages, usually unwanted unless explicitly testing gui integration async_messages: List[str] = dataclasses.field(default_factory=list) # these come from an external thread and might come in any order, validate they are received outside the main test @@ -823,6 +844,7 @@ def execute(self, timeout: int = 30) -> List[Message]: # the capture session will store all messages until capture.finish() # even if multiple messages are emitted capture = CaptureSession(self.minicroft, eof_msgs=self.eof_msgs, + eof_count=self.eof_count, ignore_messages=self.ignore_messages, async_messages=self.async_messages) for idx, source_message in enumerate(self.source_message): @@ -834,6 +856,14 @@ def execute(self, timeout: int = 30) -> List[Message]: # final message list messages = capture.finish() + # isolate a single dispatch lifecycle by skill_id — drop messages from a + # concurrent (interleaving) lifecycle so the assertion is deterministic. + if self.skill_id is not None: + messages = [m for m in messages + if (m.context or {}).get("skill_id") == self.skill_id] + if self.verbose: + print(f"💡 filtered to skill_id='{self.skill_id}': {len(messages)} messages") + if _bus_tracker is not None: _bus_tracker.stop_tracking() all_responses = messages + list(getattr(capture, "async_responses", [])) @@ -907,7 +937,7 @@ def execute(self, timeout: int = 30) -> List[Message]: assert received.context[k] == v, f"❌ message context mismatch for key '{k}' - expected '{v}' | got '{received.context[k]}'" if self.verbose: print(f"✅ got expected message context '{k}: '{v}'") - if self.test_routing: + if self.test_routing and self.skill_id is None: r_src = received.context.get("source") r_dst = received.context.get("destination") if expected.msg_type in self.keep_original_src: diff --git a/test/unittests/test_capture_session.py b/test/unittests/test_capture_session.py index bad5abd..f2ed5c5 100644 --- a/test/unittests/test_capture_session.py +++ b/test/unittests/test_capture_session.py @@ -161,6 +161,50 @@ def test_multiple_eof_types(self): self.assertIn("test.x", types) self.assertNotIn("test.late", types) + def test_eof_count_waits_for_n_occurrences(self): + """With eof_count>1, capture continues until an eof topic is seen that + many times — for scenarios with N concurrent lifecycles each terminating + on the same eof topic.""" + cs = CaptureSession(self.mc, + eof_msgs=["test.eof"], + eof_count=2, + ignore_messages=[]) + self._emit_after(0.05, Message("test.eof")) # 1st eof — must NOT stop + self._emit_after(0.10, Message("test.between")) # captured (after 1st eof) + self._emit_after(0.15, Message("test.eof")) # 2nd eof — stops capture + self._emit_after(0.30, Message("test.after")) # must NOT appear + + cs.capture(Message("test.trigger"), timeout=3) + msgs = cs.finish() + types = [m.msg_type for m in msgs] + + self.assertIn("test.between", types, + "a message between the 1st and 2nd eof must be captured") + self.assertEqual(types.count("test.eof"), 2, + "both eof occurrences are captured") + self.assertNotIn("test.after", types, + "message after the Nth eof must not be captured") + + def test_eof_count_resets_between_captures(self): + """The eof counter resets per capture() call so eof_count applies fresh.""" + cs = CaptureSession(self.mc, + eof_msgs=["test.eof"], + eof_count=2, + ignore_messages=[]) + self._emit_after(0.05, Message("test.eof")) + self._emit_after(0.10, Message("test.eof")) + cs.capture(Message("test.trigger1"), timeout=3) + cs.finish() + # a second capture must again require 2 eofs, not be already-done + cs2 = CaptureSession(self.mc, eof_msgs=["test.eof"], eof_count=2, + ignore_messages=[]) + self._emit_after(0.05, Message("test.eof")) + self._emit_after(0.10, Message("test.mid")) + self._emit_after(0.15, Message("test.eof")) + cs2.capture(Message("test.trigger2"), timeout=3) + types = [m.msg_type for m in cs2.finish()] + self.assertIn("test.mid", types) + def test_capture_timeout_returns_partial_results(self): """If the EOF never fires, capture() must return after the timeout and finish() must still return whatever was captured.""" diff --git a/test/unittests/test_end2end_extended.py b/test/unittests/test_end2end_extended.py index 8462d81..7a26417 100644 --- a/test/unittests/test_end2end_extended.py +++ b/test/unittests/test_end2end_extended.py @@ -49,6 +49,22 @@ def handle_async(self, message: Message): self.bus.emit(Message("ovos.utterance.handled", context=message.context)) +class TwoLifecycleSkill(OVOSSkill): + """Emits two lifecycles tagged with distinct context skill_ids, to exercise + the End2EndTest ``skill_id`` filter. Each lifecycle ends on the shared + ``ovos.utterance.handled`` topic (so eof_count=2 spans both).""" + + def initialize(self): + self.add_event("unittest.two_lifecycles", self.handle_two) + + def handle_two(self, message: Message): + for sid in ("life.a", "life.b"): + ctx = dict(message.context) + ctx["skill_id"] = sid + self.bus.emit(Message(f"{sid}.step", context=ctx)) + self.bus.emit(Message("ovos.utterance.handled", context=ctx)) + + def _session(sid="ext-test", pipeline=None): s = Session(sid) s.lang = "en-US" @@ -983,5 +999,77 @@ def test_count_mismatch_prints_first_differing(self): test.execute(timeout=10) +class TestSkillIdFilter(unittest.TestCase): + """The skill_id filter isolates one dispatch lifecycle from concurrent ones.""" + + def setUp(self): + LOG.set_level("ERROR") + self.mc = get_minicroft([SKILL_ID], + extra_skills={SKILL_ID: TwoLifecycleSkill}) + + def tearDown(self): + self.mc.stop() + LOG.set_level("CRITICAL") + + def _common_kwargs(self): + return dict( + minicroft=self.mc, + skill_ids=[SKILL_ID], + eof_msgs=["ovos.utterance.handled"], + eof_count=2, # both lifecycles terminate on ovos.utterance.handled + test_routing=False, + test_active_skills=False, + test_final_session=False, + ignore_messages=DEFAULT_IGNORED + HANDLER_LIFECYCLE, + verbose=False, + ) + + def test_filter_isolates_one_lifecycle(self): + """Only messages whose context skill_id matches are asserted.""" + src = _make_custom("unittest.two_lifecycles") + test = End2EndTest( + source_message=src, + skill_id="life.a", + expected_messages=[ + Message("life.a.step", {}, {"skill_id": "life.a"}), + Message("ovos.utterance.handled", {}, {"skill_id": "life.a"}), + ], + **self._common_kwargs(), + ) + # passes only if life.b.* and the source (no skill_id) are filtered out + test.execute(timeout=10) + + def test_filter_the_other_lifecycle(self): + """The same scenario, filtered to the other skill_id.""" + src = _make_custom("unittest.two_lifecycles") + test = End2EndTest( + source_message=src, + skill_id="life.b", + expected_messages=[ + Message("life.b.step", {}, {"skill_id": "life.b"}), + Message("ovos.utterance.handled", {}, {"skill_id": "life.b"}), + ], + **self._common_kwargs(), + ) + test.execute(timeout=10) + + def test_unfiltered_sees_both_lifecycles(self): + """Without the filter, eof_count=2 captures both lifecycles' messages.""" + src = _make_custom("unittest.two_lifecycles") + test = End2EndTest( + source_message=src, + expected_messages=[src], + test_message_number=False, + test_msg_type=False, + test_msg_data=False, + test_msg_context=False, + **self._common_kwargs(), + ) + result = test.execute(timeout=10) + types = [m.msg_type for m in result] + self.assertIn("life.a.step", types) + self.assertIn("life.b.step", types) + + if __name__ == "__main__": unittest.main()