Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion nerve/agent/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import annotations

import asyncio
import contextlib
import json
import logging
import os
Expand Down Expand Up @@ -1537,6 +1538,60 @@ async def run(
"is_running": False,
})

@staticmethod
async def _iter_response_with_timeout(
client: Any,
session_id: str,
idle_timeout: float,
):
"""Iterate ``client.receive_response()`` with a per-message idle timeout.

The Claude Agent SDK's ``receive_response()`` async generator can
block indefinitely if the underlying CLI subprocess hangs (stuck
Anthropic API request, broken stdio pipe, etc.). Without a timeout
the engine has no way to notice — ``is_running`` stays True, the
per-session lock stays held, queued user messages back up forever.

Wrapping each ``__anext__()`` await in ``asyncio.wait_for`` detects
a hung CLI when no SDK message of any kind (assistant chunk, tool
call, tool result, ResultMessage) arrives within ``idle_timeout``
seconds. The iterator is closed and ``asyncio.TimeoutError`` is
raised so the existing CLI-crash retry path in ``_run_inner`` can
take over.

The timeout is per-message, not per-turn, so legitimate long tool
calls (e.g. a Bash command with ``timeout=600000`` ms) don't trip
it as long as they emit ``tool_use``/``tool_result`` chunks
between waits.

``idle_timeout <= 0`` disables the timeout entirely (kept for
belt-and-suspenders ops who want the old behaviour back).
"""
response_iter = client.receive_response()
try:
while True:
try:
if idle_timeout and idle_timeout > 0:
message = await asyncio.wait_for(
response_iter.__anext__(),
timeout=idle_timeout,
)
else:
message = await response_iter.__anext__()
except StopAsyncIteration:
return
except asyncio.TimeoutError:
logger.warning(
"CLI idle timeout (%ds) for session %s — no SDK "
"message received; treating CLI as hung",
idle_timeout, session_id,
)
raise
yield message
finally:
with contextlib.suppress(Exception):
await response_iter.aclose()

async def _run_inner(
self,
session_id: str,
Expand Down Expand Up @@ -1725,8 +1780,13 @@ async def _image_prompt():
continue # retry the query

# Read response — may raise if CLI crashes mid-stream
# or hangs idle for longer than cli_idle_timeout_seconds
# (see _iter_response_with_timeout).
try:
async for message in client.receive_response():
async for message in AgentEngine._iter_response_with_timeout(
client, session_id,
self.config.agent.cli_idle_timeout_seconds,
):
# Early-capture sdk_session_id from first message that
# carries it so it survives /stop cancellation (ResultMessage
# — the normal source — never arrives when the turn is
Expand Down
22 changes: 21 additions & 1 deletion nerve/agent/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,29 @@ def register_task(self, session_id: str, task: asyncio.Task) -> None:
Does NOT mark the session as running — that's done by mark_running()
inside engine.run() to avoid a race between create_task() scheduling
and the task's actual execution.

If a task is already registered for ``session_id`` and is still
live, log a warning and replace it. The replaced task isn't
cancelled (callers may rely on it finishing), but it loses the
ability to be stopped via ``/stop``. The done-callback below is
identity-checked so the *old* task finishing later doesn't pop the
new entry from under us.
"""
existing = self._running_tasks.get(session_id)
if existing is not None and not existing.done():
logger.warning(
"register_task: replacing live task for session %s "
"(possible concurrent run)", session_id,
)
self._running_tasks[session_id] = task
task.add_done_callback(lambda _: self._running_tasks.pop(session_id, None))

def _on_done(t: asyncio.Task) -> None:
# Only pop if *this* task is still the registered one — a later
# register_task call may have replaced us, and clobbering its
# entry would leak the new task out of the stop registry.
if self._running_tasks.get(session_id) is t:
self._running_tasks.pop(session_id, None)
task.add_done_callback(_on_done)

def mark_running(self, session_id: str) -> None:
"""Mark a session as currently running."""
Expand Down
7 changes: 7 additions & 0 deletions nerve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class AgentConfig:
thinking: str = "max" # max, high, medium, low, disabled, adaptive, or number (budget_tokens)
effort: str = "max" # max, xhigh, high, medium, low
context_1m: bool = True # Enable 1M context window beta
# Hung-CLI detection: max idle time between SDK messages on a single
# turn before the engine treats the subprocess as dead and falls into
# the existing CLI-crash retry path. Set to 0 to disable (legacy
# behaviour: turns can hang forever). 900s comfortably covers a 10-min
# Bash tool call plus SDK round-trips while still catching real hangs.
cli_idle_timeout_seconds: int = 900

@classmethod
def from_dict(cls, d: dict) -> AgentConfig:
Expand All @@ -119,6 +125,7 @@ def from_dict(cls, d: dict) -> AgentConfig:
thinking=str(d.get("thinking", "max")),
effort=str(d.get("effort", "max")),
context_1m=d.get("context_1m", True),
cli_idle_timeout_seconds=int(d.get("cli_idle_timeout_seconds", 900)),
)


Expand Down
9 changes: 7 additions & 2 deletions nerve/gateway/routes/plans.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ async def approve_plan(
)
prompt += hoa_instructions

# Spawn implementation in background with error handling
# Spawn implementation in background with error handling. Register
# the task with the engine so a manual /stop can cancel a stuck impl
# session (without registration, the asyncio.Task is invisible to
# `engine.stop_session` and the only way to recover is a daemon
# restart).
async def _run_impl():
try:
await deps.engine.run(
Expand All @@ -251,7 +255,8 @@ async def _run_impl():
except Exception:
logger.exception("Failed to mark plan %s as failed", plan_id)

asyncio.create_task(_run_impl())
impl_task = asyncio.create_task(_run_impl())
deps.engine.register_task(impl_session_id, impl_task)

return {"plan_id": plan_id, "impl_session_id": impl_session_id}

Expand Down
98 changes: 98 additions & 0 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for nerve.agent.engine — pure helpers (no SDK state)."""

import asyncio
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
Expand Down Expand Up @@ -48,6 +49,103 @@ def test_effective_effort_model_default_none():


# ---------------------------------------------------------------------------
# _iter_response_with_timeout — hung-CLI detection
# ---------------------------------------------------------------------------


class _StubClient:
"""Minimal SDK-shaped client whose receive_response yields a fixed list.

If ``hang`` is True, the generator sleeps after yielding all real
messages instead of exiting cleanly — simulating a CLI that streams
initial output then goes silent forever.

Tracks whether ``aclose`` was called on the returned generator so the
timeout path can assert cleanup.
"""

def __init__(self, messages, hang=False, hang_seconds=10.0):
self._messages = messages
self._hang = hang
self._hang_seconds = hang_seconds
self.aclose_calls = 0

def receive_response(self):
outer = self

async def _gen():
try:
for msg in outer._messages:
yield msg
if outer._hang:
await asyncio.sleep(outer._hang_seconds)
finally:
outer.aclose_calls += 1

return _gen()


@pytest.mark.asyncio
async def test_iter_response_yields_messages_normally():
"""Fast SDK stream completes without timing out."""
client = _StubClient(["a", "b", "c"])
seen = []
async for msg in AgentEngine._iter_response_with_timeout(
client, "sess-1", idle_timeout=5.0,
):
seen.append(msg)
assert seen == ["a", "b", "c"]
# Generator was closed cleanly when it ran to completion.
assert client.aclose_calls == 1


@pytest.mark.asyncio
async def test_iter_response_raises_on_idle_timeout():
"""If the SDK goes silent past idle_timeout, raise TimeoutError."""
# Yields one message, then hangs long enough to trip a 50ms timeout.
client = _StubClient(["a"], hang=True, hang_seconds=2.0)
seen = []
with pytest.raises(asyncio.TimeoutError):
async for msg in AgentEngine._iter_response_with_timeout(
client, "sess-2", idle_timeout=0.05,
):
seen.append(msg)
# The first message arrived before the hang.
assert seen == ["a"]
# The underlying iterator was closed before the exception propagated.
assert client.aclose_calls == 1


@pytest.mark.asyncio
async def test_iter_response_disabled_when_timeout_zero():
"""idle_timeout <= 0 disables the timeout (legacy behaviour)."""
# Hangs forever after 1 message. Without a timeout we'd wait forever;
# to verify "disabled" we wrap the whole call in our own short outer
# timeout and assert that's what fired (not the inner one).
client = _StubClient(["a"], hang=True, hang_seconds=10.0)
seen = []
with pytest.raises(asyncio.TimeoutError):
async with asyncio.timeout(0.1):
async for msg in AgentEngine._iter_response_with_timeout(
client, "sess-3", idle_timeout=0,
):
seen.append(msg)
assert seen == ["a"]
# Outer-cancel still triggers the finally block → aclose() runs.
assert client.aclose_calls == 1


@pytest.mark.asyncio
async def test_iter_response_handles_empty_stream():
"""Empty receive_response (e.g. CLI exits immediately) returns cleanly."""
client = _StubClient([])
seen = []
async for msg in AgentEngine._iter_response_with_timeout(
client, "sess-4", idle_timeout=5.0,
):
seen.append(msg)
assert seen == []
assert client.aclose_calls == 1
# _sdk_resume_file_exists
# ---------------------------------------------------------------------------

Expand Down
32 changes: 32 additions & 0 deletions tests/test_sessions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for nerve.agent.sessions — SessionManager lifecycle, forking, cleanup."""

import asyncio
import contextlib

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -173,6 +174,37 @@ async def noop():
await asyncio.sleep(0.01)
assert "task-cleanup" not in sm._running_tasks

async def test_register_task_replacement_does_not_clobber_new_entry(
self, sm: SessionManager,
):
"""An old task finishing must not pop the *new* task's registry entry.

Regression: the old code used a closure-only ``pop(session_id, None)``
which would clobber whatever was registered at the time, including a
newer task scheduled by a concurrent register_task call. The fix
identity-checks the task in the done-callback.
"""
async def quick():
await asyncio.sleep(0.01)

async def slow():
await asyncio.sleep(1.0)

old = asyncio.create_task(quick())
sm.register_task("dup-1", old)
# Replace before old finishes.
new = asyncio.create_task(slow())
sm.register_task("dup-1", new)
# Wait for the old task to finish + its done-callback to fire.
await old
await asyncio.sleep(0.05)
# New task must still be registered — its entry survived old's
# done-callback.
assert sm._running_tasks.get("dup-1") is new
new.cancel()
with contextlib.suppress(asyncio.CancelledError):
await new

async def test_stop_session_no_client(self, sm: SessionManager):
"""Stop when there's no client or task should return False."""
result = await sm.stop_session("nonexistent")
Expand Down