From 872b921e4c9dee0d077531cffdb52492e1e78799 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 4 Feb 2026 16:10:38 +0100 Subject: [PATCH 1/3] feat(remote-comms): handle reconnection to restarted peers with incarnation ID detection - Add `clearRemoteSeqState` method to kernel store for clearing sequence state without removing the remote relationship - Add `handlePeerRestart` method to RemoteHandle that resets all state when a peer restarts with a new incarnation ID - Add `OnIncarnationChange` callback type and wire it through the transport layer, platform services, and RemoteManager - Clear permanent failure status on user-initiated sends to allow reconnection to previously-failed peers - Add RPC handler for browser runtime to notify kernel of incarnation changes - Update all test files to account for new callback parameters This enables proper handling when a peer restarts: the incarnation ID handshake detects the restart, and the callback resets RemoteHandle state (sequence numbers, pending messages) for a fresh start. Co-Authored-By: Claude Opus 4.5 --- .gitignore | 1 + .../src/PlatformServicesClient.ts | 20 ++++++ .../src/PlatformServicesServer.test.ts | 2 + .../src/PlatformServicesServer.ts | 18 +++++ .../src/kernel/PlatformServices.test.ts | 33 +++++++++ .../nodejs/src/kernel/PlatformServices.ts | 4 ++ packages/ocap-kernel/src/index.ts | 1 + .../src/remotes/kernel/RemoteHandle.test.ts | 67 +++++++++++++++++++ .../src/remotes/kernel/RemoteHandle.ts | 36 ++++++++++ .../src/remotes/kernel/RemoteManager.test.ts | 3 + .../src/remotes/kernel/RemoteManager.ts | 23 +++++++ .../src/remotes/kernel/remote-comms.test.ts | 27 ++++++++ .../src/remotes/kernel/remote-comms.ts | 4 ++ .../src/remotes/platform/transport.test.ts | 15 +++-- .../src/remotes/platform/transport.ts | 26 +++++-- packages/ocap-kernel/src/remotes/types.ts | 7 ++ .../src/rpc/kernel-remote/index.ts | 12 ++++ .../kernel-remote/remoteIncarnationChange.ts | 43 ++++++++++++ packages/ocap-kernel/src/store/index.test.ts | 1 + .../src/store/methods/remote.test.ts | 42 ++++++++++++ .../ocap-kernel/src/store/methods/remote.ts | 22 ++++++ packages/ocap-kernel/src/types.ts | 3 + 22 files changed, 397 insertions(+), 13 deletions(-) create mode 100644 packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts diff --git a/.gitignore b/.gitignore index 1279a097f..6171d4a15 100644 --- a/.gitignore +++ b/.gitignore @@ -92,3 +92,4 @@ test-results # Claude .claude/settings.local.json +.playwright-mcp/ \ No newline at end of file diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index 165d93a62..28a75a2d6 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -8,6 +8,7 @@ import type { VatId, VatConfig, RemoteCommsOptions, + OnIncarnationChange, } from '@metamask/ocap-kernel'; import { platformServicesMethodSpecs, @@ -62,6 +63,8 @@ export class PlatformServicesClient implements PlatformServices { #remoteGiveUpHandler: ((peerId: string) => void) | undefined = undefined; + #remoteIncarnationChangeHandler: OnIncarnationChange | undefined = undefined; + /** * **ATTN:** Prefer {@link PlatformServicesClient.make} over constructing * this class directly. @@ -96,6 +99,7 @@ export class PlatformServicesClient implements PlatformServices { this.#rpcServer = new RpcService(kernelRemoteHandlers, { remoteDeliver: this.#remoteDeliver.bind(this), remoteGiveUp: this.#remoteGiveUp.bind(this), + remoteIncarnationChange: this.#remoteIncarnationChange.bind(this), }); // Start draining messages immediately after construction @@ -195,6 +199,7 @@ export class PlatformServicesClient implements PlatformServices { * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - Unique identifier for this kernel instance. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -204,9 +209,11 @@ export class PlatformServicesClient implements PlatformServices { remoteMessageHandler: (from: string, message: string) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise { this.#remoteMessageHandler = remoteMessageHandler; this.#remoteGiveUpHandler = onRemoteGiveUp; + this.#remoteIncarnationChangeHandler = onIncarnationChange; await this.#rpcClient.call('initializeRemoteComms', { keySeed, ...Object.fromEntries( @@ -297,6 +304,19 @@ export class PlatformServicesClient implements PlatformServices { return null; } + /** + * Handle a remote incarnation change notification from the server. + * + * @param peerId - The peer ID of the remote that restarted. + * @returns A promise that resolves when handling is complete. + */ + async #remoteIncarnationChange(peerId: string): Promise { + if (this.#remoteIncarnationChangeHandler) { + this.#remoteIncarnationChangeHandler(peerId); + } + return null; + } + /** * Send a message to the server. * diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 8336102ce..d15109e41 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -407,6 +407,7 @@ describe('PlatformServicesServer', () => { expect.any(Function), expect.any(Function), undefined, + expect.any(Function), ); }); @@ -430,6 +431,7 @@ describe('PlatformServicesServer', () => { expect.any(Function), expect.any(Function), undefined, + expect.any(Function), ); }); diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts index cf0bdea7f..e6d12bfef 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts @@ -293,6 +293,7 @@ export class PlatformServicesServer { this.#handleRemoteMessage.bind(this), this.#handleRemoteGiveUp.bind(this), incarnationId, + this.#handleRemoteIncarnationChange.bind(this), ); this.#sendRemoteMessageFunc = sendRemoteMessage; this.#stopRemoteCommsFunc = stop; @@ -404,5 +405,22 @@ export class PlatformServicesServer { this.#logger.error('Error notifying kernel of remote give up:', error); }); } + + /** + * Handle when a remote peer's incarnation changes (peer restarted). + * Notifies the kernel worker via RPC to reset the RemoteHandle state. + * + * @param peerId - The peer ID of the remote that restarted. + */ + #handleRemoteIncarnationChange(peerId: string): void { + this.#rpcClient + .call('remoteIncarnationChange', { peerId }) + .catch((error) => { + this.#logger.error( + 'Error notifying kernel of remote incarnation change:', + error, + ); + }); + } } harden(PlatformServicesServer); diff --git a/packages/nodejs/src/kernel/PlatformServices.test.ts b/packages/nodejs/src/kernel/PlatformServices.test.ts index e0c6d0e24..a1cc433c1 100644 --- a/packages/nodejs/src/kernel/PlatformServices.test.ts +++ b/packages/nodejs/src/kernel/PlatformServices.test.ts @@ -250,6 +250,7 @@ describe('NodejsPlatformServices', () => { expect.any(Function), undefined, undefined, + undefined, ); }); @@ -272,6 +273,7 @@ describe('NodejsPlatformServices', () => { expect.any(Function), undefined, undefined, + undefined, ); }); @@ -296,6 +298,7 @@ describe('NodejsPlatformServices', () => { expect.any(Function), giveUpHandler, undefined, + undefined, ); }); @@ -322,6 +325,36 @@ describe('NodejsPlatformServices', () => { expect.any(Function), giveUpHandler, incarnationId, + undefined, + ); + }); + + it('initializes remote comms with onIncarnationChange callback', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const keySeed = '0x1234567890abcdef'; + const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer']; + const remoteHandler = vi.fn(async () => 'response'); + const giveUpHandler = vi.fn(); + const incarnationId = 'test-incarnation-id'; + const incarnationChangeHandler = vi.fn(); + + await service.initializeRemoteComms( + keySeed, + { relays }, + remoteHandler, + giveUpHandler, + incarnationId, + incarnationChangeHandler, + ); + + const { initTransport } = await import('@metamask/ocap-kernel'); + expect(initTransport).toHaveBeenCalledWith( + keySeed, + { relays }, + expect.any(Function), + giveUpHandler, + incarnationId, + incarnationChangeHandler, ); }); diff --git a/packages/nodejs/src/kernel/PlatformServices.ts b/packages/nodejs/src/kernel/PlatformServices.ts index 160dea58c..3c7202295 100644 --- a/packages/nodejs/src/kernel/PlatformServices.ts +++ b/packages/nodejs/src/kernel/PlatformServices.ts @@ -9,6 +9,7 @@ import type { SendRemoteMessage, StopRemoteComms, RemoteCommsOptions, + OnIncarnationChange, } from '@metamask/ocap-kernel'; import { initTransport } from '@metamask/ocap-kernel'; import { NodeWorkerDuplexStream } from '@metamask/streams'; @@ -228,6 +229,7 @@ export class NodejsPlatformServices implements PlatformServices { * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - This kernel's incarnation ID for handshake protocol. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -237,6 +239,7 @@ export class NodejsPlatformServices implements PlatformServices { remoteMessageHandler: (from: string, message: string) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise { if (this.#sendRemoteMessageFunc) { throw Error('remote comms already initialized'); @@ -254,6 +257,7 @@ export class NodejsPlatformServices implements PlatformServices { this.#handleRemoteMessage.bind(this), onRemoteGiveUp, incarnationId, + onIncarnationChange, ); this.#sendRemoteMessageFunc = sendRemoteMessage; this.#stopRemoteCommsFunc = stop; diff --git a/packages/ocap-kernel/src/index.ts b/packages/ocap-kernel/src/index.ts index 2e4aa3532..13e0eca34 100644 --- a/packages/ocap-kernel/src/index.ts +++ b/packages/ocap-kernel/src/index.ts @@ -19,6 +19,7 @@ export type { SendRemoteMessage, StopRemoteComms, RemoteCommsOptions, + OnIncarnationChange, } from './remotes/types.ts'; export type { RemoteMessageBase } from './remotes/kernel/RemoteHandle.ts'; export { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 59896146e..d3e4dea59 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -1166,4 +1166,71 @@ describe('RemoteHandle', () => { expect(parsed.ack).toBeUndefined(); // No highestReceivedSeq }); }); + + describe('handlePeerRestart', () => { + it('resets sequence numbers for fresh start', async () => { + const remote = makeRemote(); + + // Build up some state by sending and receiving messages + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 5, + method: 'deliver', + params: ['notify', resolutions], + }), + ); + + // Call handlePeerRestart + remote.handlePeerRestart(); + + // Send a new message - should start from seq=1 + vi.mocked(mockRemoteComms.sendRemoteMessage).mockClear(); + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]![1]; + const parsed = JSON.parse(sentString); + expect(parsed.seq).toBe(1); + // ack should not be included since highestReceivedSeq was reset to 0 + expect(parsed.ack).toBeUndefined(); + }); + + it('clears persisted sequence state', async () => { + const remote = makeRemote(); + + // Build up state + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + // Verify state exists before restart + expect(mockKernelStore.getRemoteSeqState(mockRemoteId)).toBeDefined(); + + // Call handlePeerRestart + remote.handlePeerRestart(); + + // Verify state was cleared + expect(mockKernelStore.getRemoteSeqState(mockRemoteId)).toBeUndefined(); + }); + + it('rejects pending URL redemptions', async () => { + const remote = makeRemote(); + + // Start a redemption but don't resolve it + const redeemPromise = remote.redeemOcapURL('ocap:test@peer,relay'); + + // Call handlePeerRestart + remote.handlePeerRestart(); + + // The pending redemption should be rejected + await expect(redeemPromise).rejects.toThrow('Remote peer restarted'); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index a8aa71a10..e1e47688b 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -931,4 +931,40 @@ export class RemoteHandle implements EndpointHandle { this.#clearDelayedAck(); this.rejectPendingRedemptions('Remote connection cleanup'); } + + /** + * Handle a peer restart (incarnation change). + * Resets all state for a fresh start: clears timers, rejects pending messages + * and redemptions, resets sequence numbers, and clears persisted seq state. + * Called when the handshake detects that the remote peer has restarted. + */ + handlePeerRestart(): void { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: handling peer restart, resetting state`, + ); + + // Clear timers + this.#clearAckTimeout(); + this.#clearDelayedAck(); + + // Reject all pending messages - they will never be ACKed by the restarted peer + if (this.#hasPendingMessages()) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: rejecting ${this.#getPendingCount()} pending messages due to peer restart`, + ); + this.#rejectAllPending('Remote peer restarted'); + } + + // Reject pending URL redemptions - the remote won't have context for them + this.rejectPendingRedemptions('Remote peer restarted'); + + // Reset sequence numbers for fresh start + this.#nextSendSeq = 0; + this.#highestReceivedSeq = 0; + this.#startSeq = 0; + this.#retryCount = 0; + + // Clear persisted sequence state + this.#kernelStore.clearRemoteSeqState(this.remoteId); + } } diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 232f758e1..a30a9912f 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -81,6 +81,7 @@ describe('RemoteManager', () => { undefined, expect.any(Function), kernelStore.provideIncarnationId(), + expect.any(Function), // onIncarnationChange ); }); @@ -109,6 +110,7 @@ describe('RemoteManager', () => { undefined, expect.any(Function), kernelStore.provideIncarnationId(), + expect.any(Function), // onIncarnationChange ); }); @@ -137,6 +139,7 @@ describe('RemoteManager', () => { keySeed, expect.any(Function), kernelStore.provideIncarnationId(), + expect.any(Function), // onIncarnationChange ); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index f2fefef97..718035742 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -134,6 +134,28 @@ export class RemoteManager { } } + /** + * Handle when a remote peer's incarnation changes (peer restarted). + * Resets the RemoteHandle state for a fresh start. + * + * @param peerId - The peer ID of the remote that restarted. + */ + #handleIncarnationChange(peerId: string): void { + const remote = this.#remotesByPeer.get(peerId); + if (!remote) { + // Remote not found - might not have been established yet + this.#logger?.log( + `Incarnation change for unknown peer ${peerId.slice(0, 8)}, ignoring`, + ); + return; + } + + this.#logger?.log( + `Handling incarnation change for peer ${peerId.slice(0, 8)}`, + ); + remote.handlePeerRestart(); + } + /** * Initialize the remote comms object at kernel startup. * @@ -162,6 +184,7 @@ export class RemoteManager { this.#keySeed, this.#handleRemoteGiveUp.bind(this), this.#incarnationId, + this.#handleIncarnationChange.bind(this), ); // Restore all remotes that were previously established diff --git a/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts b/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts index 724e36748..486aa6a2d 100644 --- a/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts @@ -144,6 +144,7 @@ describe('remote-comms', () => { mockRemoteMessageHandler, undefined, // onRemoteGiveUp undefined, // incarnationId + undefined, // onIncarnationChange ); }); @@ -167,6 +168,7 @@ describe('remote-comms', () => { mockRemoteMessageHandler, undefined, undefined, // incarnationId + undefined, // onIncarnationChange ); }); @@ -187,6 +189,7 @@ describe('remote-comms', () => { mockRemoteMessageHandler, onRemoteGiveUp, undefined, // incarnationId + undefined, // onIncarnationChange ); }); @@ -208,6 +211,30 @@ describe('remote-comms', () => { mockRemoteMessageHandler, undefined, incarnationId, + undefined, // onIncarnationChange + ); + }); + + it('passes onIncarnationChange callback to platformServices', async () => { + const onIncarnationChange = vi.fn(); + await initRemoteComms( + mockKernelStore, + mockPlatformServices, + mockRemoteMessageHandler, + {}, + undefined, // logger + undefined, // keySeed + undefined, // onRemoteGiveUp + undefined, // incarnationId + onIncarnationChange, + ); + expect(mockPlatformServices.initializeRemoteComms).toHaveBeenCalledWith( + expect.any(String), + expect.any(Object), + mockRemoteMessageHandler, + undefined, + undefined, // incarnationId + onIncarnationChange, ); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts b/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts index fe952c195..00aa32614 100644 --- a/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts +++ b/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts @@ -13,6 +13,7 @@ import type { RemoteComms, RemoteMessageHandler, OnRemoteGiveUp, + OnIncarnationChange, RemoteCommsOptions, } from '../types.ts'; @@ -105,6 +106,7 @@ export function getKnownRelays(kv: KVStore): string[] { * @param keySeed - Optional seed for libp2p key generation. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - Unique identifier for this kernel instance. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * * @returns the initialized remote comms object. */ @@ -117,6 +119,7 @@ export async function initRemoteComms( keySeed?: string, onRemoteGiveUp?: OnRemoteGiveUp, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise { let peerId: string; let ocapURLKey: Uint8Array; @@ -170,6 +173,7 @@ export async function initRemoteComms( remoteMessageHandler, onRemoteGiveUp, incarnationId, + onIncarnationChange, ); /** diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index e52cf10ed..83c0e4bbf 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -2582,7 +2582,7 @@ describe('transport.initTransport', () => { ); }); - it('calls onRemoteGiveUp when incarnation changes', async () => { + it('calls onIncarnationChange when incarnation changes', async () => { let inboundHandler: ((channel: MockChannel) => void) | undefined; mockConnectionFactory.onInboundConnection.mockImplementation( (handler: (channel: MockChannel) => void) => { @@ -2590,14 +2590,15 @@ describe('transport.initTransport', () => { }, ); - const onRemoteGiveUp = vi.fn(); + const onIncarnationChange = vi.fn(); const localIncarnationId = 'local-incarnation'; await initTransport( '0x1234', {}, vi.fn().mockResolvedValue(''), - onRemoteGiveUp, + undefined, // onRemoteGiveUp localIncarnationId, + onIncarnationChange, ); // First handshake from remote peer @@ -2623,8 +2624,8 @@ describe('transport.initTransport', () => { ); }); - // First incarnation should not trigger onRemoteGiveUp - expect(onRemoteGiveUp).not.toHaveBeenCalled(); + // First incarnation should not trigger onIncarnationChange + expect(onIncarnationChange).not.toHaveBeenCalled(); // Second handshake with different incarnation (simulating peer restart) const mockInboundChannel2 = createMockChannel('remote-peer'); @@ -2649,8 +2650,8 @@ describe('transport.initTransport', () => { ); }); - // Changed incarnation should trigger onRemoteGiveUp - expect(onRemoteGiveUp).toHaveBeenCalledWith('remote-peer'); + // Changed incarnation should trigger onIncarnationChange + expect(onIncarnationChange).toHaveBeenCalledWith('remote-peer'); }); it('passes regular messages to remoteMessageHandler after handshake', async () => { diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 52d11a178..acd17e721 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -37,6 +37,7 @@ import type { StopRemoteComms, Channel, OnRemoteGiveUp, + OnIncarnationChange, RemoteCommsOptions, } from '../types.ts'; @@ -57,6 +58,7 @@ import type { * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @param localIncarnationId - This kernel's incarnation ID for handshake protocol. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes (peer restarted). * * @returns a function to send messages **and** a `stop()` to cancel/release everything. */ @@ -66,6 +68,7 @@ export async function initTransport( remoteMessageHandler: RemoteMessageHandler, onRemoteGiveUp?: OnRemoteGiveUp, localIncarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise<{ sendRemoteMessage: SendRemoteMessage; stop: StopRemoteComms; @@ -151,11 +154,12 @@ export async function initTransport( } // Handle incarnation change outside try-catch so callback errors // don't incorrectly mark the handshake as failed - if (result.incarnationChanged && onRemoteGiveUp) { + if (result.incarnationChanged) { logger.log( - `${channel.peerId.slice(0, 8)}:: incarnation changed during outbound handshake, triggering promise rejection`, + `${channel.peerId.slice(0, 8)}:: incarnation changed during outbound handshake, resetting remote state`, ); - onRemoteGiveUp(channel.peerId); + // Call incarnation change callback first to reset RemoteHandle state + onIncarnationChange?.(channel.peerId); } return true; } @@ -180,11 +184,12 @@ export async function initTransport( } // Handle incarnation change outside try-catch so callback errors // don't incorrectly mark the handshake as failed - if (result.incarnationChanged && onRemoteGiveUp) { + if (result.incarnationChanged) { logger.log( - `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, triggering promise rejection`, + `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, resetting remote state`, ); - onRemoteGiveUp(channel.peerId); + // Call incarnation change callback first to reset RemoteHandle state + onIncarnationChange?.(channel.peerId); } return true; } @@ -439,6 +444,15 @@ export async function initTransport( // Get or establish channel let { channel } = state; if (!channel) { + // Clear permanent failure status - user is explicitly trying to communicate + // This allows user-initiated messages to "resurrect" a permanently-failed peer + if (reconnectionManager.isPermanentlyFailed(targetPeerId)) { + logger.log( + `${targetPeerId.slice(0, 8)}:: clearing permanent failure status on user-initiated send`, + ); + reconnectionManager.clearPermanentFailure(targetPeerId); + } + // Check connection limit before attempting to dial checkConnectionLimit(); diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index f278ff93a..586eb1d15 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -26,6 +26,13 @@ export type RemoteComms = { export type OnRemoteGiveUp = (peerId: string) => void; +/** + * Callback invoked when a remote peer's incarnation ID changes (peer restarted). + * + * @param peerId - The peer ID whose incarnation changed. + */ +export type OnIncarnationChange = (peerId: string) => void; + /** * Options for initializing remote communications. */ diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/index.ts b/packages/ocap-kernel/src/rpc/kernel-remote/index.ts index 1221bafa1..1042cac17 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/index.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/index.ts @@ -5,21 +5,33 @@ import type { } from './remoteDeliver.ts'; import { remoteGiveUpSpec, remoteGiveUpHandler } from './remoteGiveUp.ts'; import type { RemoteGiveUpSpec, RemoteGiveUpHandler } from './remoteGiveUp.ts'; +import { + remoteIncarnationChangeSpec, + remoteIncarnationChangeHandler, +} from './remoteIncarnationChange.ts'; +import type { + RemoteIncarnationChangeSpec, + RemoteIncarnationChangeHandler, +} from './remoteIncarnationChange.ts'; export const kernelRemoteHandlers = { remoteDeliver: remoteDeliverHandler, remoteGiveUp: remoteGiveUpHandler, + remoteIncarnationChange: remoteIncarnationChangeHandler, } as { remoteDeliver: RemoteDeliverHandler; remoteGiveUp: RemoteGiveUpHandler; + remoteIncarnationChange: RemoteIncarnationChangeHandler; }; export const kernelRemoteMethodSpecs = { remoteDeliver: remoteDeliverSpec, remoteGiveUp: remoteGiveUpSpec, + remoteIncarnationChange: remoteIncarnationChangeSpec, } as { remoteDeliver: RemoteDeliverSpec; remoteGiveUp: RemoteGiveUpSpec; + remoteIncarnationChange: RemoteIncarnationChangeSpec; }; type Handlers = diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts new file mode 100644 index 000000000..f2655c556 --- /dev/null +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts @@ -0,0 +1,43 @@ +import type { MethodSpec, Handler } from '@metamask/kernel-rpc-methods'; +import { object, string, literal } from '@metamask/superstruct'; +import type { Infer } from '@metamask/superstruct'; + +const paramsStruct = object({ + peerId: string(), +}); + +type Params = Infer; + +export type RemoteIncarnationChangeSpec = MethodSpec< + 'remoteIncarnationChange', + { peerId: string }, + null +>; + +export const remoteIncarnationChangeSpec: RemoteIncarnationChangeSpec = { + method: 'remoteIncarnationChange', + params: paramsStruct, + result: literal(null), +}; + +export type HandleRemoteIncarnationChange = (peerId: string) => Promise; + +type RemoteIncarnationChangeHooks = { + remoteIncarnationChange: HandleRemoteIncarnationChange; +}; + +export type RemoteIncarnationChangeHandler = Handler< + 'remoteIncarnationChange', + Params, + Promise, + RemoteIncarnationChangeHooks +>; + +export const remoteIncarnationChangeHandler: RemoteIncarnationChangeHandler = { + ...remoteIncarnationChangeSpec, + hooks: { remoteIncarnationChange: true }, + implementation: async ({ remoteIncarnationChange }, params) => { + await remoteIncarnationChange(params.peerId); + return null; + }, +}; diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index af0cb55a1..fef585f41 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -52,6 +52,7 @@ describe('kernel store', () => { 'clear', 'clearEmptySubclusters', 'clearReachableFlag', + 'clearRemoteSeqState', 'collectGarbage', 'createCrankSavepoint', 'decRefCount', diff --git a/packages/ocap-kernel/src/store/methods/remote.test.ts b/packages/ocap-kernel/src/store/methods/remote.test.ts index 82abbeda0..86660cf79 100644 --- a/packages/ocap-kernel/src/store/methods/remote.test.ts +++ b/packages/ocap-kernel/src/store/methods/remote.test.ts @@ -341,4 +341,46 @@ describe('remote store methods', () => { expect(deleted).toBe(0); }); }); + + describe('clearRemoteSeqState', () => { + it('deletes all seq state and pending messages', () => { + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + mockKV.set(`remoteSeq.${remoteId1}.highestReceivedSeq`, '5'); + mockKV.set(`remoteSeq.${remoteId1}.startSeq`, '2'); + mockKV.set(`remotePending.${remoteId1}.2`, '{"seq":2}'); + mockKV.set(`remotePending.${remoteId1}.3`, '{"seq":3}'); + mockGetPrefixedKeys.mockReturnValue([ + `remotePending.${remoteId1}.2`, + `remotePending.${remoteId1}.3`, + ]); + + remoteMethods.clearRemoteSeqState(remoteId1); + + expect(mockKV.has(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe(false); + expect(mockKV.has(`remoteSeq.${remoteId1}.highestReceivedSeq`)).toBe( + false, + ); + expect(mockKV.has(`remoteSeq.${remoteId1}.startSeq`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(false); + }); + + it('does not affect remote relationship info', () => { + mockKV.set(`remote.${remoteId1}`, JSON.stringify(remoteInfo1)); + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + mockGetPrefixedKeys.mockReturnValue([]); + + remoteMethods.clearRemoteSeqState(remoteId1); + + // Remote info should still exist + expect(mockKV.has(`remote.${remoteId1}`)).toBe(true); + // Seq state should be cleared + expect(mockKV.has(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe(false); + }); + + it('does nothing when no state exists', () => { + mockGetPrefixedKeys.mockReturnValue([]); + expect(() => remoteMethods.clearRemoteSeqState(remoteId1)).not.toThrow(); + }); + }); }); diff --git a/packages/ocap-kernel/src/store/methods/remote.ts b/packages/ocap-kernel/src/store/methods/remote.ts index 14c6e1059..f968c9387 100644 --- a/packages/ocap-kernel/src/store/methods/remote.ts +++ b/packages/ocap-kernel/src/store/methods/remote.ts @@ -230,6 +230,27 @@ export function getRemoteMethods(ctx: StoreContext) { return deletedCount; } + /** + * Clear all sequence state for a remote (seq counters + all pending messages). + * Called when a remote peer restarts (incarnation changes) to reset for fresh communication. + * Unlike deleteRemotePendingState, this does NOT delete the remote relationship itself. + * + * @param remoteId - The remote whose sequence state is to be cleared. + */ + function clearRemoteSeqState(remoteId: RemoteId): void { + // Delete seq state + const seqPrefix = `${REMOTE_SEQ_BASE}${remoteId}.`; + kv.delete(`${seqPrefix}nextSendSeq`); + kv.delete(`${seqPrefix}highestReceivedSeq`); + kv.delete(`${seqPrefix}startSeq`); + + // Delete all pending messages + const pendingPrefix = `${REMOTE_PENDING_BASE}${remoteId}.`; + for (const key of getPrefixedKeys(pendingPrefix)) { + kv.delete(key); + } + } + return { getAllRemoteRecords, getRemoteInfo, @@ -245,5 +266,6 @@ export function getRemoteMethods(ctx: StoreContext) { deletePendingMessage, deleteRemotePendingState, cleanupOrphanMessages, + clearRemoteSeqState, }; } diff --git a/packages/ocap-kernel/src/types.ts b/packages/ocap-kernel/src/types.ts index e23f14421..5ed09797b 100644 --- a/packages/ocap-kernel/src/types.ts +++ b/packages/ocap-kernel/src/types.ts @@ -34,6 +34,7 @@ import type { SendRemoteMessage, StopRemoteComms, OnRemoteGiveUp, + OnIncarnationChange, RemoteCommsOptions, } from './remotes/types.ts'; import { Fail } from './utils/assert.ts'; @@ -325,6 +326,7 @@ export type PlatformServices = { * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - Unique identifier for this kernel instance. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -334,6 +336,7 @@ export type PlatformServices = { remoteMessageHandler: RemoteMessageHandler, onRemoteGiveUp?: OnRemoteGiveUp, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ) => Promise; /** * Stop network communications. From 94e456999379b0d56ba2214cc64ecdb7f5fc9e11 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 4 Feb 2026 19:39:14 +0100 Subject: [PATCH 2/3] fix(remote-comms): prevent stale message delivery after incarnation change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a message triggers dial→handshake and incarnation change is detected, the message was still being sent with stale content (old seq number, old object/promise references). This could cause errors on the remote peer. Changes: - Update doOutboundHandshake to return { success, incarnationChanged } - Throw error when incarnation changes during sendRemoteMessage to prevent stale message delivery - Update reconnection-lifecycle to handle new handshake return type - Fix outdated comment in e2e test (onRemoteGiveUp → onIncarnationChange) Co-Authored-By: Claude Opus 4.5 --- packages/nodejs/test/e2e/remote-comms.test.ts | 4 +-- .../platform/reconnection-lifecycle.test.ts | 11 +++++--- .../platform/reconnection-lifecycle.ts | 14 ++++++---- .../src/remotes/platform/transport.ts | 26 ++++++++++++------- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 772905244..104a05e99 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -917,8 +917,8 @@ describe.sequential('Remote Communications E2E', () => { const response = kunser(result); // The message should fail because incarnation changed. - // The handshake detects the new incarnation and triggers onRemoteGiveUp, - // which rejects pending promises with a "Remote connection lost" error. + // The handshake detects the new incarnation and triggers onIncarnationChange, + // which resets RemoteHandle state and rejects pending work. expect(response).toBeInstanceOf(Error); expect((response as Error).message).toMatch(/Remote connection lost/u); }, diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index a9a923e4f..8ae97d042 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -90,7 +90,9 @@ describe('reconnection-lifecycle', () => { checkConnectionRateLimit: vi.fn(), closeChannel: vi.fn().mockResolvedValue(undefined), registerChannel: vi.fn(), - doOutboundHandshake: vi.fn().mockResolvedValue(true), + doOutboundHandshake: vi + .fn() + .mockResolvedValue({ success: true, incarnationChanged: false }), } as unknown as ReconnectionLifecycleDeps; }); @@ -238,9 +240,10 @@ describe('reconnection-lifecycle', () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) .mockReturnValueOnce(false); - (deps.doOutboundHandshake as ReturnType).mockResolvedValue( - false, - ); + (deps.doOutboundHandshake as ReturnType).mockResolvedValue({ + success: false, + incarnationChanged: false, + }); const lifecycle = makeReconnectionLifecycle(deps); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 5de8187b9..34ba7206b 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -38,8 +38,10 @@ export type ReconnectionLifecycleDeps = { channel: Channel, errorContext?: string, ) => void; - /** Perform outbound handshake. Returns true if successful. */ - doOutboundHandshake: (channel: Channel) => Promise; + /** Perform outbound handshake. Returns success status and whether incarnation changed. */ + doOutboundHandshake: ( + channel: Channel, + ) => Promise<{ success: boolean; incarnationChanged: boolean }>; }; /** @@ -236,9 +238,9 @@ export function makeReconnectionLifecycle( throw error; } // Perform handshake before registering the channel - let handshakeOk; + let handshakeResult; try { - handshakeOk = await doOutboundHandshake(channel); + handshakeResult = await doOutboundHandshake(channel); } catch (handshakeError) { // Handshake threw (e.g., onRemoteGiveUp callback failed) - close channel to prevent leak try { @@ -248,7 +250,7 @@ export function makeReconnectionLifecycle( } throw handshakeError; } - if (!handshakeOk) { + if (!handshakeResult.success) { // Handshake failures are retryable (could be transient network issues) // Return null to signal retry instead of throwing non-retryable error logger.log( @@ -257,6 +259,8 @@ export function makeReconnectionLifecycle( await closeChannel(channel, peerId); return null; } + // Note: incarnationChanged is handled by the callback in doOutboundHandshake + // For reconnection, we still register the channel - messages will be fresh registerChannel(peerId, channel, 'reading channel to'); } diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index acd17e721..9315f2a02 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -136,21 +136,22 @@ export async function initTransport( /** * Perform outbound handshake and handle incarnation changes. - * Returns true if handshake succeeded (or was skipped), false if it failed. * * @param channel - The channel to perform handshake on. - * @returns True if handshake succeeded or was skipped. + * @returns Object with success status and whether incarnation changed. */ - async function doOutboundHandshake(channel: Channel): Promise { + async function doOutboundHandshake( + channel: Channel, + ): Promise<{ success: boolean; incarnationChanged: boolean }> { if (!handshakeDeps) { - return true; // No handshake configured, skip + return { success: true, incarnationChanged: false }; // No handshake configured, skip } let result; try { result = await performOutboundHandshake(channel, handshakeDeps); } catch (problem) { outputError(channel.peerId, 'outbound handshake', problem); - return false; + return { success: false, incarnationChanged: false }; } // Handle incarnation change outside try-catch so callback errors // don't incorrectly mark the handshake as failed @@ -161,7 +162,7 @@ export async function initTransport( // Call incarnation change callback first to reset RemoteHandle state onIncarnationChange?.(channel.peerId); } - return true; + return { success: true, incarnationChanged: result.incarnationChanged }; } /** @@ -493,9 +494,9 @@ export async function initTransport( throw error; } // Perform handshake before registering the channel - let handshakeOk; + let handshakeResult; try { - handshakeOk = await doOutboundHandshake(channel); + handshakeResult = await doOutboundHandshake(channel); } catch (handshakeError) { // Handshake threw (e.g., onRemoteGiveUp callback failed) - close channel to prevent leak try { @@ -505,10 +506,17 @@ export async function initTransport( } throw handshakeError; } - if (!handshakeOk) { + if (!handshakeResult.success) { await connectionFactory.closeChannel(channel, targetPeerId); throw Error('Handshake failed'); } + if (handshakeResult.incarnationChanged) { + // Peer restarted - don't send stale message, let caller retry with fresh state + registerChannel(targetPeerId, channel, 'reading channel to'); + throw Error( + 'Remote peer restarted: message not sent to avoid stale delivery', + ); + } registerChannel(targetPeerId, channel, 'reading channel to'); } } catch (problem) { From f473710078d8d1658d293da55f0066d2fe94e2c2 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 4 Feb 2026 20:39:43 +0100 Subject: [PATCH 3/3] fix(remote-comms): reject kernel promises when peer incarnation changes The handleIncarnationChange callback was missing kernel promise rejection. When a remote peer restarts, any pending kernel promises that were waiting on that peer need to be rejected since the peer has lost its state and won't be able to resolve them. This mirrors the behavior in handleRemoteGiveUp and ensures consistent promise handling for all remote failure modes. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteManager.test.ts | 65 +++++++++++++++++++ .../src/remotes/kernel/RemoteManager.ts | 14 +++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index a30a9912f..4d7b1f67b 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -22,6 +22,7 @@ describe('RemoteManager', () => { let remoteManager: RemoteManager; let mockPlatformServices: PlatformServices; let kernelStore: ReturnType; + let kernelKVStore: ReturnType['kernelKVStore']; let mockKernelQueue: KernelQueue; let logger: Logger; let mockRemoteComms: RemoteComms; @@ -29,6 +30,7 @@ describe('RemoteManager', () => { beforeEach(() => { const kernelDatabase = makeMapKernelDatabase(); + kernelKVStore = kernelDatabase.kernelKVStore; kernelStore = makeKernelStore(kernelDatabase); logger = new Logger('test'); @@ -567,4 +569,67 @@ describe('RemoteManager', () => { expect(resolvePromisesSpy).not.toHaveBeenCalled(); }); }); + + describe('handleIncarnationChange', () => { + beforeEach(async () => { + const messageHandler = vi.fn(); + vi.mocked(remoteComms.initRemoteComms).mockResolvedValue(mockRemoteComms); + remoteManager.setMessageHandler(messageHandler); + await remoteManager.initRemoteComms(); + }); + + it('calls handlePeerRestart on remote when incarnation changes', () => { + const peerId = 'peer-that-restarted'; + const remote = remoteManager.establishRemote(peerId); + const handlePeerRestartSpy = vi.spyOn(remote, 'handlePeerRestart'); + // Get the onIncarnationChange callback (9th argument, index 8) + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + onIncarnationChange(peerId); + expect(handlePeerRestartSpy).toHaveBeenCalled(); + }); + + it('rejects kernel promises where remote is decider', () => { + const peerId = 'peer-with-promises'; + const remote = remoteManager.establishRemote(peerId); + const { remoteId } = remote; + + // Set up a promise where the remote is the decider + const [kpid] = kernelStore.initKernelPromise(); + kernelStore.setPromiseDecider(kpid, remoteId); + + // Set up the cle. key that getPromisesByDecider looks for + // The key format is cle.{decider}.{eref} = kpid + kernelKVStore.set(`cle.${remoteId}.p+1`, kpid); + + const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); + + // Trigger incarnation change + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + onIncarnationChange(peerId); + + // Should reject the promise with incarnation change error + expect(resolvePromisesSpy).toHaveBeenCalledWith(remoteId, [ + [kpid, true, expect.objectContaining({ body: expect.any(String) })], + ]); + }); + + it('does nothing when remote does not exist', () => { + const peerId = 'non-existent-peer'; + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + expect(() => onIncarnationChange(peerId)).not.toThrow(); + }); + + it('does not reject promises when there are none', () => { + const peerId = 'peer-without-promises'; + remoteManager.establishRemote(peerId); + const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + onIncarnationChange(peerId); + expect(resolvePromisesSpy).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index 718035742..ac87f3cae 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -136,7 +136,7 @@ export class RemoteManager { /** * Handle when a remote peer's incarnation changes (peer restarted). - * Resets the RemoteHandle state for a fresh start. + * Resets the RemoteHandle state and rejects kernel promises for a fresh start. * * @param peerId - The peer ID of the remote that restarted. */ @@ -153,7 +153,19 @@ export class RemoteManager { this.#logger?.log( `Handling incarnation change for peer ${peerId.slice(0, 8)}`, ); + + // Reset RemoteHandle state (pending messages, redemptions, seq numbers) remote.handlePeerRestart(); + + // Reject all kernel promises where this remote is the decider + // The restarted peer has lost its state and won't resolve these promises + const { remoteId } = remote; + const failure = kser( + Error(`Remote peer restarted: ${peerId} (incarnation changed)`), + ); + for (const kpid of this.#kernelStore.getPromisesByDecider(remoteId)) { + this.#kernelQueue.resolvePromises(remoteId, [[kpid, true, failure]]); + } } /**