fix(chat): survive socket reconnects — thread-key session/cancel + thread-room stream#2493
Conversation
📝 WalkthroughWalkthroughFrontend socket service re-subscribes to active threads on reconnect. Backend adds a ChangesThread-stable session persistence across socket reconnects
Sequence Diagram(s)sequenceDiagram
participant Client as Client Socket
participant SocketIO as Socket.IO Server
participant SessionCache as Session & Task Cache
Note over Client,SocketIO: Initial connection
Client->>SocketIO: connect (client_id_1)
Client->>SocketIO: emit 'thread:subscribe' {thread_id: 'ABC'}
SocketIO->>SessionCache: lookup/ensure key_for('ABC')
SessionCache-->>SocketIO: cached session/task handles
SocketIO->>SocketIO: join room 'thread:ABC'
Note over Client,SocketIO: Disconnect and reconnect
Client->>SocketIO: connect (client_id_2)
Client->>SocketIO: emit 'thread:subscribe' {thread_id: 'ABC'}
SocketIO->>SessionCache: lookup key_for('ABC')
SessionCache-->>SocketIO: same cached session/task handles
SocketIO->>SocketIO: join room 'thread:ABC'
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/channels/providers/web.rs`:
- Around line 134-145: invalidate_thread_sessions is still deleting entries
using the old "::<thread_id>" suffix while key_for(thread_id) now returns just
thread_id, so removals no longer match; update invalidate_thread_sessions to
remove entries from THREAD_SESSIONS and IN_FLIGHT using key_for(thread_id)
(i.e., the plain thread_id) instead of the legacy suffix, or delete both
patterns for backward compatibility (call key_for(thread_id) and also attempt
the old format) so thread session and in-flight handles are actually cleared.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e1771923-32e8-41a6-8020-374429ab4382
📒 Files selected for processing (4)
app/src/services/socketService.tssrc/core/socketio.rssrc/openhuman/channels/providers/web.rssrc/openhuman/channels/providers/web_tests.rs
There was a problem hiding this comment.
🧹 Nitpick comments (2)
app/src/services/__tests__/socketService.events.test.ts (2)
125-145: ⚡ Quick winAdd test coverage for the
activeThreadIdfallback path.The production code falls back to
activeThreadIdwhenselectedThreadIdis absent:const activeThreadId = threadState?.selectedThreadId ?? threadState?.activeThreadId;This test only verifies the
selectedThreadIdpath. Add a companion test case whereselectedThreadIdisnullbutactiveThreadIdis set to ensure the fallback logic works correctly.🧪 Suggested test case for activeThreadId fallback
it('re-subscribes using activeThreadId when selectedThreadId is null', async () => { const { handlers, mockSocket } = buildMockSocket(); vi.doMock('socket.io-client', () => ({ io: vi.fn(() => mockSocket) })); getCoreRpcUrlMock.mockResolvedValue('http://127.0.0.1:7788/rpc'); storeMock.getState.mockReturnValue({ thread: { selectedThreadId: null, activeThreadId: 'thread-abc' }, }); const { socketService } = await import('../socketService'); socketService.connect('jwt-test-thread-sub-fallback'); await pollUntil(() => expect(handlers['connect']).toBeDefined()); handlers['connect']!(); expect((mockSocket as { emit: ReturnType<typeof vi.fn> }).emit).toHaveBeenCalledWith( 'thread:subscribe', { thread_id: 'thread-abc' } ); });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/src/services/__tests__/socketService.events.test.ts` around lines 125 - 145, The existing test only covers the selectedThreadId path; add a new test that mocks storeMock.getState to return thread: { selectedThreadId: null, activeThreadId: 'thread-abc' } and otherwise mirrors the original test flow so that after importing socketService and calling socketService.connect(...) the connect handler triggers and the mock socket's emit is asserted to have been called with 'thread:subscribe' and { thread_id: 'thread-abc' } (use same helpers: buildMockSocket, getCoreRpcUrlMock, vi.doMock, pollUntil and inspect handlers/connect and mockSocket.emit).
147-164: ⚡ Quick winAdd explicit test coverage for a completely absent thread slice.
The PR objectives mention "avoid errors when the thread slice is absent," and the production code uses optional chaining (
threadState?.) to handle this. However, this test uses a present thread slice with null IDs rather than testing a completely absent slice.Consider adding a test case where
getState()returns{}or{ thread: undefined }to explicitly verify the guard handles a missing thread slice without errors.🧪 Suggested test case for absent thread slice
it('does not emit thread:subscribe when thread slice is absent from state', async () => { const { handlers, mockSocket } = buildMockSocket(); vi.doMock('socket.io-client', () => ({ io: vi.fn(() => mockSocket) })); getCoreRpcUrlMock.mockResolvedValue('http://127.0.0.1:7788/rpc'); storeMock.getState.mockReturnValue({}); // No thread slice at all const { socketService } = await import('../socketService'); socketService.connect('jwt-test-no-thread-slice'); await pollUntil(() => expect(handlers['connect']).toBeDefined()); handlers['connect']!(); const emitMock = (mockSocket as { emit: ReturnType<typeof vi.fn> }).emit; const threadSub = emitMock.mock.calls.find(([ev]) => ev === 'thread:subscribe'); expect(threadSub).toBeUndefined(); });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/src/services/__tests__/socketService.events.test.ts` around lines 147 - 164, Add a new test that simulates a completely absent thread slice by making storeMock.getState return {} (or { thread: undefined }) before importing and calling socketService.connect; reuse buildMockSocket, mockSocket and the existing connect handler check (handlers['connect']) and assert that mockSocket.emit was never called with 'thread:subscribe' to verify the optional-chaining guard handles a missing thread slice without throwing. Ensure the test name and setup mirror the existing test ('does not emit thread:subscribe when thread slice is absent from state') and keep getCoreRpcUrlMock and vi.doMock('socket.io-client') setup identical to the current tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@app/src/services/__tests__/socketService.events.test.ts`:
- Around line 125-145: The existing test only covers the selectedThreadId path;
add a new test that mocks storeMock.getState to return thread: {
selectedThreadId: null, activeThreadId: 'thread-abc' } and otherwise mirrors the
original test flow so that after importing socketService and calling
socketService.connect(...) the connect handler triggers and the mock socket's
emit is asserted to have been called with 'thread:subscribe' and { thread_id:
'thread-abc' } (use same helpers: buildMockSocket, getCoreRpcUrlMock, vi.doMock,
pollUntil and inspect handlers/connect and mockSocket.emit).
- Around line 147-164: Add a new test that simulates a completely absent thread
slice by making storeMock.getState return {} (or { thread: undefined }) before
importing and calling socketService.connect; reuse buildMockSocket, mockSocket
and the existing connect handler check (handlers['connect']) and assert that
mockSocket.emit was never called with 'thread:subscribe' to verify the
optional-chaining guard handles a missing thread slice without throwing. Ensure
the test name and setup mirror the existing test ('does not emit
thread:subscribe when thread slice is absent from state') and keep
getCoreRpcUrlMock and vi.doMock('socket.io-client') setup identical to the
current tests.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 51c93e42-b983-47f3-a3ec-182369663d73
📒 Files selected for processing (2)
app/src/services/__tests__/socketService.events.test.tsapp/src/services/socketService.ts
THREAD_SESSIONS (conversation history cache) and IN_FLIGHT (cancel registry) were keyed by key_for(client_id, thread_id), where client_id is the ephemeral Socket.IO connection id that is regenerated on every reconnect. So when the webview's socket reconnected with a new client_id, the same thread mapped to a fresh empty session (conversation amnesia) and cancel_chat could no longer find the running turn (Cancel became a silent no-op). The persistence layer already keys by thread_id (turn-state snapshots, transcripts); this realigns the in-memory runtime with that stable identity. Event delivery still routes by client_id (the live socket) — only the thread-owned runtime state now keys off thread_id. NOTE: the live-stream Socket.IO room is still client_id-scoped (room = event.client_id); re-attaching an in-flight turn's stream to a reconnected socket needs a paired thread-room + client re-subscribe change, tracked separately. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… reconnects Chat-stream events were emitted only to the initiating socket's client_id room. Since client_id is regenerated on every Socket.IO reconnect, a webview that reconnected mid-turn stopped receiving the in-flight turn's stream (it went to the dead old-client_id room). Now emit to BOTH the client_id room and a per-thread room (thread:<id>); a socket re-subscribes to its active thread room on (re)connect via a new thread:subscribe event, so the stream re-attaches to the new connection. socket.io de-duplicates a socket across multiple target rooms, so no double-render. Pairs with the thread-keyed session/cancel fix (63ded2f); together they make a mid-turn reconnect fully transparent. Frontend: socketService re-emits thread:subscribe for the active thread on connect. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5e2bb0d to
4dcf15f
Compare
…test thread:subscribe The thread:subscribe re-subscribe added to the connect handler did store.getState().thread.selectedThreadId, which threw under the existing socketService.events.test.ts mock (storeMock had no getState) and would throw in any state where the thread slice is absent. Read the thread slice once and optional-chain it; add getState to the test mock; add two cases covering connect→thread:subscribe (active thread emits, no active thread does not). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
4dcf15f to
1194afe
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
app/src/services/__tests__/socketService.events.test.ts (1)
130-169: ⚡ Quick winAdd a fallback-branch test for
activeThreadIdwhenselectedThreadIdis null.Line 250 in
app/src/services/socketService.tsusesselectedThreadId ?? activeThreadId, but current tests only coverselectedThreadIdpresent and both missing. Add one case withselectedThreadId: nullandactiveThreadId: 'thread-abc'to lock the fallback behavior.Proposed test addition
+ it('re-subscribes using activeThreadId when selectedThreadId is null', async () => { + const { handlers, mockSocket } = buildMockSocket(); + + vi.doMock('socket.io-client', () => ({ io: vi.fn(() => mockSocket) })); + getCoreRpcUrlMock.mockResolvedValue('http://127.0.0.1:7788/rpc'); + storeMock.getState.mockReturnValue({ + thread: { selectedThreadId: null, activeThreadId: 'thread-abc' }, + }); + + const { socketService } = await import('../socketService'); + socketService.connect('jwt-test-active-thread-fallback'); + + await pollUntil(() => expect(handlers['connect']).toBeDefined()); + handlers['connect']!(); + + expect((mockSocket as { emit: ReturnType<typeof vi.fn> }).emit).toHaveBeenCalledWith( + 'thread:subscribe', + { thread_id: 'thread-abc' } + ); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/src/services/__tests__/socketService.events.test.ts` around lines 130 - 169, Add a new test that verifies the fallback to activeThreadId when selectedThreadId is null: use buildMockSocket() and vi.doMock('socket.io-client') like the other tests, set storeMock.getState to return { thread: { selectedThreadId: null, activeThreadId: 'thread-abc' } }, import socketService and call socketService.connect(...), wait for handlers['connect'] and invoke it, then assert the mock socket emit was called with 'thread:subscribe' and payload { thread_id: 'thread-abc' } to exercise the selectedThreadId ?? activeThreadId branch in socketService.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@app/src/services/__tests__/socketService.events.test.ts`:
- Around line 130-169: Add a new test that verifies the fallback to
activeThreadId when selectedThreadId is null: use buildMockSocket() and
vi.doMock('socket.io-client') like the other tests, set storeMock.getState to
return { thread: { selectedThreadId: null, activeThreadId: 'thread-abc' } },
import socketService and call socketService.connect(...), wait for
handlers['connect'] and invoke it, then assert the mock socket emit was called
with 'thread:subscribe' and payload { thread_id: 'thread-abc' } to exercise the
selectedThreadId ?? activeThreadId branch in socketService.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c44c42fc-fd65-4379-b0a8-7b6119aa5caa
📒 Files selected for processing (2)
app/src/services/__tests__/socketService.events.test.tsapp/src/services/socketService.ts
…read-room stream (tinyhumansai#2493) Co-authored-by: sanil-23 <sanil@alphahuman.xyz> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
client_idinstead of the stablethread_id. Sinceclient_idis regenerated on every Socket.IO reconnect, a reconnect silently orphaned that state.thread_id, and mirror the chat stream to a per-thread room (with athread:subscribere-subscribe on (re)connect). Event delivery still usesclient_id.Problem
key_for(client_id, thread_id)produced"{client_id}::{thread_id}", and that was the key for bothTHREAD_SESSIONS(the in-memory conversation/transcript cache) andIN_FLIGHT(the cancel registry). The chat stream was likewise emitted only toroom = client_id(emit_web_channel_event).client_idis the per-connection engine.io id — a new one on every reconnect. So when a webview's socket dropped and reconnected mid-session:THREAD_SESSIONS.get(newClientId::thread)missed → the turn started withhistory_len=0(the on-disk transcript is keyed bythread_id, so the durable record was intact; only the in-memory lookup failed).cancel_chatlooked upIN_FLIGHTunder the newclient_id, found nothing, returnedOk(logged-> ok) and aborted nothing; the turn ran on to the iteration cap.client_idroom, which the reconnected socket was no longer in.The persistence layer already keys by
thread_id(turn-state snapshots, transcripts), so the runtime was simply inconsistent with its own durable identity.Solution
key_for(thread_id)—THREAD_SESSIONSandIN_FLIGHTnow key bythread_idalone. A reconnect (newclient_id) resolves to the same session (cache hit, history retained) and the same in-flight handle (Cancel works). Event delivery still routes byclient_id.emit_web_channel_eventnow emits to both theclient_idroom andthread:<id>. A newthread:subscribesocket event joinsthread:<id>; the frontend (socketService) re-emits it for the active thread on everyconnect. So a reconnected socket re-joins the thread room and the in-flight stream re-attaches.socket.iode-duplicates a socket present in multiple target rooms, so no double-render.Verified live (with socketio debug logging on): after a mid-turn reload, the new socket logged
joined room 'thread:<id>'and the turn's stream events emitted torooms=["<newClientId>", "thread:<id>"]and were received; cache hits retainedhistory_len; and Cancel aborted the in-flight turn across a reconnect (vs. the prior no-op).Submission Checklist
key_for_is_thread_scoped_not_client_scoped(regression guard, replaceskey_for_combines_client_id_and_thread_id) and twosocketService.events.test.tscases coveringconnect → thread:subscribe(active thread emits; no active thread does not). Existingcancel_chattests still pass.Coverage Gate (diff-cover ≥ 80%)check passes on this PR.N/A: behaviour-only bug fix.N/A: none.N/A: no release-cut surface beyond chat reconnect (covered above).N/A: no tracking issue filed.Impact
socketService.tsre-subscribe) requires an app bundle rebuild to take effect; the backend halves are independent.Related
N/A(no issue filed)reconnectionAttemptsis low, so a long drop needs a manual reload to reconnect (not addressed here).AI Authored PR Metadata
Linear Issue
N/AN/ACommit & Branch
fix/thread-keyed-session-and-streamce0f1c8f(session/cancel keying),95c314da(thread-room stream)Validation Run
pnpm --filter openhuman-app format:check— frontend change is a smallsocketService.tsaddition; typechecked on the equivalent source.pnpm typecheck— passed on the equivalent source (socketService.ts).key_for_is_thread_scoped_not_client_scoped,cancel_chat_validates_required_fieldspass.cargo fmtclean; the identical code builds in release (cargo build --release --bin openhuman-core) and was run live, confirming the fixes.N/A: nosrc-taurichanges.Behavior Changes
thread_id, surviving socket reconnects.Parity Contract
client_id; the per-thread room is additive (dual-emit), so non-reconnect streaming is unchanged.thread_id-keyed) is now matched by the in-memory runtime.Duplicate / Superseded PR Handling
N/A🤖 Generated with Claude Code