From ee3ee4747c35f60ba5f3e04726224a269caef36e Mon Sep 17 00:00:00 2001 From: pufit Date: Fri, 27 Mar 2026 15:35:49 -0400 Subject: [PATCH] Fix background task auto-resume invisible in UI The StreamingMessage component only renders when isStreaming is true. During auto-resume after background tasks complete, session_running was the only signal to enter streaming mode, but its !isStreaming guard caused race conditions where tokens accumulated invisibly. - Add auto_resume_start event broadcast before auto-resume run - Remove !isStreaming guard from session_running handler (idempotent) - Track background watchers for cancellation on stop/shutdown - Touch session during polling to prevent idle sweep killing client - Await auto-resume instead of fire-and-forget create_task --- nerve/agent/engine.py | 50 ++++++++++++++++++++++++++++++++----- nerve/agent/streaming.py | 12 +++++++++ web/src/api/websocket.ts | 1 + web/src/stores/chatStore.ts | 27 +++++++++++++++----- 4 files changed, 78 insertions(+), 12 deletions(-) diff --git a/nerve/agent/engine.py b/nerve/agent/engine.py index 08ec60a..1d344c8 100644 --- a/nerve/agent/engine.py +++ b/nerve/agent/engine.py @@ -108,6 +108,8 @@ def __init__(self, config: NerveConfig, db: Database): self._router = None # ChannelRouter — lazy-initialized via .router property self._mcp_servers_cache = list(config.mcp_servers) # hot-reloadable self._claude_code_plugins: list[dict[str, str]] = [] # plugin dirs + # Background task watcher tracking — one watcher per session max. + self._background_watchers: dict[str, asyncio.Task] = {} async def initialize(self) -> None: """Initialize the agent engine — set up tools and main session.""" @@ -353,6 +355,12 @@ async def shutdown(self) -> None: No memorization here — the periodic sweep handles that. Sessions are marked idle so they can be resumed on next startup. """ + # Cancel all background task watchers first + for watcher in self._background_watchers.values(): + if not watcher.done(): + watcher.cancel() + self._background_watchers.clear() + for sid, client in list(self.sessions._clients.items()): try: await self._safe_disconnect(client) @@ -879,6 +887,10 @@ def register_task(self, session_id: str, task: asyncio.Task) -> None: async def stop_session(self, session_id: str) -> bool: """Stop a running session.""" + # Cancel the background task watcher if one is running + watcher = self._background_watchers.pop(session_id, None) + if watcher and not watcher.done(): + watcher.cancel() # Cancel any pending interactive tool prompts so the handler unblocks handler = get_handler(session_id) if handler: @@ -1490,11 +1502,16 @@ async def _image_prompt(): for t in bg_tasks ], }) - asyncio.create_task( + # Cancel any existing watcher for this session before spawning + old_watcher = self._background_watchers.pop(session_id, None) + if old_watcher and not old_watcher.done(): + old_watcher.cancel() + watcher = asyncio.create_task( self._watch_background_tasks( session_id, bg_tasks, source, channel, ) ) + self._background_watchers[session_id] = watcher return full_response_text @@ -1523,6 +1540,10 @@ async def _watch_background_tasks( await asyncio.sleep(poll_interval) elapsed += poll_interval + # Keep session alive — prevent idle sweep from killing the + # client while we wait for background tasks to finish. + self.sessions.touch(session_id) + all_done = True newly_completed = False for task in bg_tasks: @@ -1602,10 +1623,16 @@ async def _watch_background_tasks( ) return + # Tell the frontend to enter streaming mode BEFORE the run starts. + # This ensures the thinking cursor is visible and input is disabled + # so the user can't type during the auto-resume. + await broadcaster.broadcast_auto_resume_start(session_id) + # Trigger a new engine.run() so the model picks up the - # background task notifications from the SDK - task = asyncio.create_task( - self.run( + # background task results. We await instead of create_task + # so errors propagate and the lifecycle is controlled. + try: + await self.run( session_id=session_id, user_message=( "[Background tasks completed. " @@ -1615,14 +1642,25 @@ async def _watch_background_tasks( channel=channel, internal=True, ) - ) - self.register_task(session_id, task) + except Exception as run_err: + logger.error( + "Auto-resume failed for session %s: %s", + session_id, run_err, + ) + except asyncio.CancelledError: + logger.info( + "Background task watcher cancelled for session %s", + session_id, + ) except Exception as e: logger.error( "Background task watcher failed for session %s: %s", session_id, e, ) + finally: + # Clean up watcher reference + self._background_watchers.pop(session_id, None) # ------------------------------------------------------------------ # # Cron / Hook runs # diff --git a/nerve/agent/streaming.py b/nerve/agent/streaming.py index 0ebf84a..295951e 100644 --- a/nerve/agent/streaming.py +++ b/nerve/agent/streaming.py @@ -220,6 +220,18 @@ async def broadcast_file_changed( "tool_use_id": tool_use_id, }) + async def broadcast_auto_resume_start(self, session_id: str) -> None: + """Notify the frontend that a background-task auto-resume is starting. + + Broadcast on the *session* channel (not __global__) so it reaches + the connected WebSocket listener and is buffered for replay. + The frontend uses this to enter streaming mode before tokens arrive. + """ + await self.broadcast(session_id, { + "type": "auto_resume_start", + "session_id": session_id, + }) + # Global broadcaster instance broadcaster = StreamBroadcaster() diff --git a/web/src/api/websocket.ts b/web/src/api/websocket.ts index cb2dbc0..2c7bb26 100644 --- a/web/src/api/websocket.ts +++ b/web/src/api/websocket.ts @@ -23,6 +23,7 @@ export type WSMessage = | { type: 'notification_answered'; notification_id: string; session_id: string; answer: string; answered_by: string } | { type: 'answer_injected'; session_id: string; notification_id: string; title: string; answer: string; answered_by: string; content: string } | { type: 'session_running'; session_id: string; is_running: boolean } + | { type: 'auto_resume_start'; session_id: string } | { type: 'background_tasks_update'; session_id: string; tasks: { task_id: string; label: string; tool: string; status: 'running' | 'done' | 'timeout' }[] } | { type: 'hoa_progress'; session_id: string; event: Record } | { type: 'pong' }; diff --git a/web/src/stores/chatStore.ts b/web/src/stores/chatStore.ts index ab785da..acd138e 100644 --- a/web/src/stores/chatStore.ts +++ b/web/src/stores/chatStore.ts @@ -911,12 +911,12 @@ export const useChatStore = create((set, get) => ({ : sess, ) ?? null, }; - // Active session started running from a background trigger (e.g., - // background task completion, answer injection) — enter streaming mode - // so the response is visible and input is disabled. - // Guard: sendMessage() already sets isStreaming before the WS message, - // so this only fires for server-initiated runs. - if (runMsg.session_id === s.activeSession && runMsg.is_running && !s.isStreaming) { + // Active session started running — enter streaming mode so the + // response is visible and input is disabled. No isStreaming guard: + // sendMessage() sets identical values (idempotent) and removing the + // guard ensures server-initiated runs (auto-resume, answer injection) + // always transition the UI into streaming mode reliably. + if (runMsg.session_id === s.activeSession && runMsg.is_running) { updates.isStreaming = true; updates.streamingBlocks = []; updates.agentStatus = { state: 'thinking' }; @@ -1103,6 +1103,21 @@ export const useChatStore = create((set, get) => ({ break; } + case 'auto_resume_start': { + // Background tasks completed — the backend is about to auto-resume. + // Enter streaming mode immediately so the thinking cursor is visible + // and chat input is disabled (prevents user from typing mid-resume). + const arMsg = msg as Extract; + if (arMsg.session_id === state.activeSession) { + set({ + isStreaming: true, + streamingBlocks: [], + agentStatus: { state: 'thinking' }, + }); + } + break; + } + case 'background_tasks_update': { const bt = msg as Extract; if (bt.session_id === state.activeSession) {