Add Race, All, Take, Debounce saga effects and gateway tests#29
Conversation
…eway tests (#29) Expand saga effects from 8 to 12 with Race (first-wins), All (wait-all), Take (action-waiting), and Debounce (timer-restart). Fix Python 2 exception syntax in pipeline.py:401. Add 28 gateway tests (previously 0 coverage). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Coverage Report✅ 87.0% overall coverage
|
…ngelog fragment Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR expands Milo’s saga system with new orchestration effects (Race, All, Take, Debounce), fixes a Python exception-syntax issue in the pipeline module, and adds a new gateway test suite to establish baseline coverage for the MCP gateway behavior.
Changes:
- Added new saga effects (Race/All/Take/Debounce) and corresponding effect tests.
- Implemented gateway tests covering discovery namespacing, proxy routing, idle reaping, and error handling.
- Fixed invalid multi-exception
exceptsyntax inpipeline.py.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_gateway.py | New test suite validating gateway discovery/proxying/idle reaping and error cases. |
| tests/test_effects.py | Adds tests for the new saga effects and their expected runtime behavior. |
| src/milo/state.py | Implements runtime support for Race/All/Take/Debounce in saga execution. |
| src/milo/pipeline.py | Fixes invalid exception tuple syntax in _handler_wants_context. |
| src/milo/_types.py | Introduces new effect dataclasses: Race, All, Take, Debounce. |
| src/milo/init.py | Exports the new effects via the public API. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/milo/state.py
Outdated
| # Block outside the lock; cancel can interrupt | ||
| waiter_event.wait(timeout=timeout) |
There was a problem hiding this comment.
Take currently blocks on waiter_event.wait(timeout=timeout), so saga cancellation cannot interrupt the wait (especially when timeout is None, but also for long timeouts). This can cause cancelled sagas to hang indefinitely while waiting for an action. Consider waiting in a loop with a short interval (or computing a deadline) so you can periodically check cancel.is_set() and exit promptly, and update the comment that says cancellation can interrupt.
| # Block outside the lock; cancel can interrupt | |
| waiter_event.wait(timeout=timeout) | |
| # Wait outside the lock in short intervals so cancellation | |
| # can be checked promptly while still honoring timeout. | |
| wait_interval = 0.1 | |
| deadline = None if timeout is None else time.monotonic() + timeout | |
| while not waiter_event.is_set(): | |
| if cancel.is_set(): | |
| break | |
| if deadline is None: | |
| current_timeout = wait_interval | |
| else: | |
| remaining = deadline - time.monotonic() | |
| if remaining <= 0: | |
| break | |
| current_timeout = min(wait_interval, remaining) | |
| waiter_event.wait(timeout=current_timeout) |
| entries = self._action_waiters.get(action_type, []) | ||
| for i, (ev, _) in enumerate(entries): | ||
| if ev is waiter_event: | ||
| entries.pop(i) | ||
| break | ||
| try: | ||
| effect = saga.throw( |
There was a problem hiding this comment.
Take timeout/cancel cleanup removes the waiter entry from the per-action list, but it never deletes the dict key when the list becomes empty. Over time, distinct action types that time out can accumulate empty lists in _action_waiters. Consider deleting self._action_waiters[action_type] when entries becomes empty after a removal.
src/milo/state.py
Outdated
| try: | ||
| effect = next(saga) | ||
| while True: | ||
| if cancel.is_set(): | ||
| try: | ||
| self.dispatch(Action("@@SAGA_CANCELLED")) | ||
| except Exception: | ||
| _logger.debug("Failed to dispatch @@SAGA_CANCELLED", exc_info=True) | ||
| return | ||
| match effect: | ||
| case Call(fn, args, kwargs): | ||
| try: | ||
| result = fn(*args, **kwargs) | ||
| except Exception as call_err: | ||
| effect = saga.throw(call_err) | ||
| else: | ||
| effect = saga.send(result) | ||
| case Put(action): | ||
| self.dispatch(action) | ||
| effect = next(saga) | ||
| case Select(selector): | ||
| state = self._state | ||
| if selector: | ||
| state = selector(state) | ||
| effect = saga.send(state) | ||
| case Fork(child_saga): | ||
| child_cancel = threading.Event() | ||
| self._executor.submit(self._run_saga, child_saga, child_cancel) | ||
| effect = saga.send(child_cancel) | ||
| case Delay(seconds): | ||
| cancel.wait(timeout=seconds) | ||
| if cancel.is_set(): | ||
| continue | ||
| effect = next(saga) | ||
| case Retry(fn, r_args, r_kwargs, max_attempts, backoff, base_delay, max_delay): | ||
| result = _execute_retry( | ||
| fn, r_args, r_kwargs, max_attempts, backoff, base_delay, max_delay | ||
| ) | ||
| effect = saga.send(result) | ||
| case Timeout(inner_effect, seconds): | ||
| try: | ||
| result = self._execute_timeout(inner_effect, seconds) | ||
| effect = saga.send(result) | ||
| except TimeoutError as te: | ||
| effect = saga.throw(te) | ||
| case TryCall(fn, args, kwargs): | ||
| try: | ||
| result = fn(*args, **kwargs) | ||
| effect = saga.send((result, None)) | ||
| except Exception as call_err: | ||
| effect = saga.send((None, call_err)) | ||
| case _: | ||
| raise StateError( | ||
| ErrorCode.STA_SAGA, | ||
| f"Unknown effect type: {type(effect).__name__}", | ||
| ) | ||
| except StopIteration as si: | ||
| result_box.append(si.value) | ||
| done.set() |
There was a problem hiding this comment.
_run_saga_capturing (used by Race/All) only implements a subset of effects. Child sagas that yield Take, Debounce, Race, All, etc. will currently raise Unknown effect type, which makes the new concurrency effects non-composable with the rest of the saga system. Consider refactoring to share the same effect-stepping logic as _run_saga, or add handling for all supported effects in _run_saga_capturing.
| try: | |
| effect = next(saga) | |
| while True: | |
| if cancel.is_set(): | |
| try: | |
| self.dispatch(Action("@@SAGA_CANCELLED")) | |
| except Exception: | |
| _logger.debug("Failed to dispatch @@SAGA_CANCELLED", exc_info=True) | |
| return | |
| match effect: | |
| case Call(fn, args, kwargs): | |
| try: | |
| result = fn(*args, **kwargs) | |
| except Exception as call_err: | |
| effect = saga.throw(call_err) | |
| else: | |
| effect = saga.send(result) | |
| case Put(action): | |
| self.dispatch(action) | |
| effect = next(saga) | |
| case Select(selector): | |
| state = self._state | |
| if selector: | |
| state = selector(state) | |
| effect = saga.send(state) | |
| case Fork(child_saga): | |
| child_cancel = threading.Event() | |
| self._executor.submit(self._run_saga, child_saga, child_cancel) | |
| effect = saga.send(child_cancel) | |
| case Delay(seconds): | |
| cancel.wait(timeout=seconds) | |
| if cancel.is_set(): | |
| continue | |
| effect = next(saga) | |
| case Retry(fn, r_args, r_kwargs, max_attempts, backoff, base_delay, max_delay): | |
| result = _execute_retry( | |
| fn, r_args, r_kwargs, max_attempts, backoff, base_delay, max_delay | |
| ) | |
| effect = saga.send(result) | |
| case Timeout(inner_effect, seconds): | |
| try: | |
| result = self._execute_timeout(inner_effect, seconds) | |
| effect = saga.send(result) | |
| except TimeoutError as te: | |
| effect = saga.throw(te) | |
| case TryCall(fn, args, kwargs): | |
| try: | |
| result = fn(*args, **kwargs) | |
| effect = saga.send((result, None)) | |
| except Exception as call_err: | |
| effect = saga.send((None, call_err)) | |
| case _: | |
| raise StateError( | |
| ErrorCode.STA_SAGA, | |
| f"Unknown effect type: {type(effect).__name__}", | |
| ) | |
| except StopIteration as si: | |
| result_box.append(si.value) | |
| done.set() | |
| def _capturing_wrapper() -> Any: | |
| try: | |
| result = yield from saga | |
| except Exception as e: | |
| error_box.append(e) | |
| else: | |
| result_box.append(result) | |
| finally: | |
| done.set() | |
| try: | |
| self._run_saga(_capturing_wrapper(), cancel) |
On Python 3.14t (no GIL), a child saga could finish between the per-child done.is_set() for-loop and the all-done check, causing _execute_race to hit the fallback `return None` path instead of returning the child's result. Re-check results in the all-done branch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…child sagas - Take effect now polls in short intervals (0.1s) so saga cancellation can interrupt even indefinite waits, instead of blocking on a single waiter_event.wait() call. - Take waiter cleanup now deletes the dict key when the per-action list becomes empty, preventing unbounded growth of _action_waiters. - Replaced duplicated effect-stepping logic in _run_saga_capturing with a yield-from wrapper that delegates to _run_saga, making child sagas in Race/All fully composable with all effect types (Take, Debounce, nested Race/All, etc.). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
pipeline.py:401(except ValueError, TypeError:→except (ValueError, TypeError):)milo.__init__public APITest plan
🤖 Generated with Claude Code