diff --git a/nerve/agent/engine.py b/nerve/agent/engine.py index dbc8149..307260c 100644 --- a/nerve/agent/engine.py +++ b/nerve/agent/engine.py @@ -8,6 +8,7 @@ from __future__ import annotations import asyncio +import contextlib import json import logging import os @@ -1537,6 +1538,60 @@ async def run( "is_running": False, }) + @staticmethod + async def _iter_response_with_timeout( + client: Any, + session_id: str, + idle_timeout: float, + ): + """Iterate ``client.receive_response()`` with a per-message idle timeout. + + The Claude Agent SDK's ``receive_response()`` async generator can + block indefinitely if the underlying CLI subprocess hangs (stuck + Anthropic API request, broken stdio pipe, etc.). Without a timeout + the engine has no way to notice — ``is_running`` stays True, the + per-session lock stays held, queued user messages back up forever. + + Wrapping each ``__anext__()`` await in ``asyncio.wait_for`` detects + a hung CLI when no SDK message of any kind (assistant chunk, tool + call, tool result, ResultMessage) arrives within ``idle_timeout`` + seconds. The iterator is closed and ``asyncio.TimeoutError`` is + raised so the existing CLI-crash retry path in ``_run_inner`` can + take over. + + The timeout is per-message, not per-turn, so legitimate long tool + calls (e.g. a Bash command with ``timeout=600000`` ms) don't trip + it as long as they emit ``tool_use``/``tool_result`` chunks + between waits. + + ``idle_timeout <= 0`` disables the timeout entirely (kept for + belt-and-suspenders ops who want the old behaviour back). + """ + response_iter = client.receive_response() + try: + while True: + try: + if idle_timeout and idle_timeout > 0: + message = await asyncio.wait_for( + response_iter.__anext__(), + timeout=idle_timeout, + ) + else: + message = await response_iter.__anext__() + except StopAsyncIteration: + return + except asyncio.TimeoutError: + logger.warning( + "CLI idle timeout (%ds) for session %s — no SDK " + "message received; treating CLI as hung", + idle_timeout, session_id, + ) + raise + yield message + finally: + with contextlib.suppress(Exception): + await response_iter.aclose() + async def _run_inner( self, session_id: str, @@ -1725,8 +1780,13 @@ async def _image_prompt(): continue # retry the query # Read response — may raise if CLI crashes mid-stream + # or hangs idle for longer than cli_idle_timeout_seconds + # (see _iter_response_with_timeout). try: - async for message in client.receive_response(): + async for message in AgentEngine._iter_response_with_timeout( + client, session_id, + self.config.agent.cli_idle_timeout_seconds, + ): # Early-capture sdk_session_id from first message that # carries it so it survives /stop cancellation (ResultMessage # — the normal source — never arrives when the turn is diff --git a/nerve/agent/sessions.py b/nerve/agent/sessions.py index aa6734f..4c9e51a 100644 --- a/nerve/agent/sessions.py +++ b/nerve/agent/sessions.py @@ -306,9 +306,29 @@ def register_task(self, session_id: str, task: asyncio.Task) -> None: Does NOT mark the session as running — that's done by mark_running() inside engine.run() to avoid a race between create_task() scheduling and the task's actual execution. + + If a task is already registered for ``session_id`` and is still + live, log a warning and replace it. The replaced task isn't + cancelled (callers may rely on it finishing), but it loses the + ability to be stopped via ``/stop``. The done-callback below is + identity-checked so the *old* task finishing later doesn't pop the + new entry from under us. """ + existing = self._running_tasks.get(session_id) + if existing is not None and not existing.done(): + logger.warning( + "register_task: replacing live task for session %s " + "(possible concurrent run)", session_id, + ) self._running_tasks[session_id] = task - task.add_done_callback(lambda _: self._running_tasks.pop(session_id, None)) + + def _on_done(t: asyncio.Task) -> None: + # Only pop if *this* task is still the registered one — a later + # register_task call may have replaced us, and clobbering its + # entry would leak the new task out of the stop registry. + if self._running_tasks.get(session_id) is t: + self._running_tasks.pop(session_id, None) + task.add_done_callback(_on_done) def mark_running(self, session_id: str) -> None: """Mark a session as currently running.""" diff --git a/nerve/config.py b/nerve/config.py index f970b74..7d46fb4 100644 --- a/nerve/config.py +++ b/nerve/config.py @@ -107,6 +107,12 @@ class AgentConfig: thinking: str = "max" # max, high, medium, low, disabled, adaptive, or number (budget_tokens) effort: str = "max" # max, xhigh, high, medium, low context_1m: bool = True # Enable 1M context window beta + # Hung-CLI detection: max idle time between SDK messages on a single + # turn before the engine treats the subprocess as dead and falls into + # the existing CLI-crash retry path. Set to 0 to disable (legacy + # behaviour: turns can hang forever). 900s comfortably covers a 10-min + # Bash tool call plus SDK round-trips while still catching real hangs. + cli_idle_timeout_seconds: int = 900 @classmethod def from_dict(cls, d: dict) -> AgentConfig: @@ -119,6 +125,7 @@ def from_dict(cls, d: dict) -> AgentConfig: thinking=str(d.get("thinking", "max")), effort=str(d.get("effort", "max")), context_1m=d.get("context_1m", True), + cli_idle_timeout_seconds=int(d.get("cli_idle_timeout_seconds", 900)), ) diff --git a/nerve/gateway/routes/plans.py b/nerve/gateway/routes/plans.py index 3fe6f6f..af9f93b 100644 --- a/nerve/gateway/routes/plans.py +++ b/nerve/gateway/routes/plans.py @@ -238,7 +238,11 @@ async def approve_plan( ) prompt += hoa_instructions - # Spawn implementation in background with error handling + # Spawn implementation in background with error handling. Register + # the task with the engine so a manual /stop can cancel a stuck impl + # session (without registration, the asyncio.Task is invisible to + # `engine.stop_session` and the only way to recover is a daemon + # restart). async def _run_impl(): try: await deps.engine.run( @@ -251,7 +255,8 @@ async def _run_impl(): except Exception: logger.exception("Failed to mark plan %s as failed", plan_id) - asyncio.create_task(_run_impl()) + impl_task = asyncio.create_task(_run_impl()) + deps.engine.register_task(impl_session_id, impl_task) return {"plan_id": plan_id, "impl_session_id": impl_session_id} diff --git a/tests/test_engine.py b/tests/test_engine.py index 92d992a..5dcea0f 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -1,5 +1,6 @@ """Tests for nerve.agent.engine — pure helpers (no SDK state).""" +import asyncio from pathlib import Path from types import SimpleNamespace from unittest.mock import patch @@ -48,6 +49,103 @@ def test_effective_effort_model_default_none(): # --------------------------------------------------------------------------- +# _iter_response_with_timeout — hung-CLI detection +# --------------------------------------------------------------------------- + + +class _StubClient: + """Minimal SDK-shaped client whose receive_response yields a fixed list. + + If ``hang`` is True, the generator sleeps after yielding all real + messages instead of exiting cleanly — simulating a CLI that streams + initial output then goes silent forever. + + Tracks whether ``aclose`` was called on the returned generator so the + timeout path can assert cleanup. + """ + + def __init__(self, messages, hang=False, hang_seconds=10.0): + self._messages = messages + self._hang = hang + self._hang_seconds = hang_seconds + self.aclose_calls = 0 + + def receive_response(self): + outer = self + + async def _gen(): + try: + for msg in outer._messages: + yield msg + if outer._hang: + await asyncio.sleep(outer._hang_seconds) + finally: + outer.aclose_calls += 1 + + return _gen() + + +@pytest.mark.asyncio +async def test_iter_response_yields_messages_normally(): + """Fast SDK stream completes without timing out.""" + client = _StubClient(["a", "b", "c"]) + seen = [] + async for msg in AgentEngine._iter_response_with_timeout( + client, "sess-1", idle_timeout=5.0, + ): + seen.append(msg) + assert seen == ["a", "b", "c"] + # Generator was closed cleanly when it ran to completion. + assert client.aclose_calls == 1 + + +@pytest.mark.asyncio +async def test_iter_response_raises_on_idle_timeout(): + """If the SDK goes silent past idle_timeout, raise TimeoutError.""" + # Yields one message, then hangs long enough to trip a 50ms timeout. + client = _StubClient(["a"], hang=True, hang_seconds=2.0) + seen = [] + with pytest.raises(asyncio.TimeoutError): + async for msg in AgentEngine._iter_response_with_timeout( + client, "sess-2", idle_timeout=0.05, + ): + seen.append(msg) + # The first message arrived before the hang. + assert seen == ["a"] + # The underlying iterator was closed before the exception propagated. + assert client.aclose_calls == 1 + + +@pytest.mark.asyncio +async def test_iter_response_disabled_when_timeout_zero(): + """idle_timeout <= 0 disables the timeout (legacy behaviour).""" + # Hangs forever after 1 message. Without a timeout we'd wait forever; + # to verify "disabled" we wrap the whole call in our own short outer + # timeout and assert that's what fired (not the inner one). + client = _StubClient(["a"], hang=True, hang_seconds=10.0) + seen = [] + with pytest.raises(asyncio.TimeoutError): + async with asyncio.timeout(0.1): + async for msg in AgentEngine._iter_response_with_timeout( + client, "sess-3", idle_timeout=0, + ): + seen.append(msg) + assert seen == ["a"] + # Outer-cancel still triggers the finally block → aclose() runs. + assert client.aclose_calls == 1 + + +@pytest.mark.asyncio +async def test_iter_response_handles_empty_stream(): + """Empty receive_response (e.g. CLI exits immediately) returns cleanly.""" + client = _StubClient([]) + seen = [] + async for msg in AgentEngine._iter_response_with_timeout( + client, "sess-4", idle_timeout=5.0, + ): + seen.append(msg) + assert seen == [] + assert client.aclose_calls == 1 # _sdk_resume_file_exists # --------------------------------------------------------------------------- diff --git a/tests/test_sessions.py b/tests/test_sessions.py index 48295a7..712ec28 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -1,6 +1,7 @@ """Tests for nerve.agent.sessions — SessionManager lifecycle, forking, cleanup.""" import asyncio +import contextlib import pytest import pytest_asyncio @@ -173,6 +174,37 @@ async def noop(): await asyncio.sleep(0.01) assert "task-cleanup" not in sm._running_tasks + async def test_register_task_replacement_does_not_clobber_new_entry( + self, sm: SessionManager, + ): + """An old task finishing must not pop the *new* task's registry entry. + + Regression: the old code used a closure-only ``pop(session_id, None)`` + which would clobber whatever was registered at the time, including a + newer task scheduled by a concurrent register_task call. The fix + identity-checks the task in the done-callback. + """ + async def quick(): + await asyncio.sleep(0.01) + + async def slow(): + await asyncio.sleep(1.0) + + old = asyncio.create_task(quick()) + sm.register_task("dup-1", old) + # Replace before old finishes. + new = asyncio.create_task(slow()) + sm.register_task("dup-1", new) + # Wait for the old task to finish + its done-callback to fire. + await old + await asyncio.sleep(0.05) + # New task must still be registered — its entry survived old's + # done-callback. + assert sm._running_tasks.get("dup-1") is new + new.cancel() + with contextlib.suppress(asyncio.CancelledError): + await new + async def test_stop_session_no_client(self, sm: SessionManager): """Stop when there's no client or task should return False.""" result = await sm.stop_session("nonexistent")