From b4dfd4c77a768034f366327613c6a9b8388702e4 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Wed, 4 Feb 2026 17:29:11 -0800 Subject: [PATCH 01/12] docs: Update Ken protocol assessment with receive-side findings Identified bugs in handleRemoteMessage: no dedup check and wrong persistence order. Recommended fix: wrap in database transaction. Refs #808 Co-Authored-By: Claude Opus 4.5 --- docs/ken-protocol-assessment.md | 142 +++++++++++++++++--------------- 1 file changed, 75 insertions(+), 67 deletions(-) diff --git a/docs/ken-protocol-assessment.md b/docs/ken-protocol-assessment.md index 5b5c72e54..c756a710a 100644 --- a/docs/ken-protocol-assessment.md +++ b/docs/ken-protocol-assessment.md @@ -46,10 +46,10 @@ Key aspects: | Retransmission | ✓ | Timeout-based retransmit until ACK or max retries | | Crash-safe persistence | ✓ | Write message first, then update nextSendSeq | | Local recovery | ✓ | Restore seq state, restart ACK timeout | -| **Transactional turns** | ✓ | Crank buffering defers outputs until crank commit | -| **Deferred transmission** | ✓ | Outputs reach RemoteHandle only after originating crank commits | -| **Output validity** | ✓ | Crank buffering ensures outputs escape only after commit | -| **Atomic checkpoint** | ✓ | Database savepoints make crank state changes atomic | +| Transactional turns | ✓ | Crank buffering defers outputs until crank commit | +| Deferred transmission | ✓ | Outputs reach RemoteHandle only after originating crank commits | +| Output validity | ✓ | Crank buffering ensures outputs escape only after commit | +| Atomic checkpoint | ✓ | Database savepoints make crank state changes atomic | ### Crank Buffering (Issue #786) @@ -80,45 +80,48 @@ This achieves Ken's property that **outputs are only externalized after successf ### Remaining Gaps (Receive Side) -The remaining gaps are on the **receive side** of remote messaging: +The remaining gaps are on the **receive side** of remote messaging. Code review of `RemoteHandle.handleRemoteMessage()` revealed specific bugs: -#### 1. Done Table / Duplicate Detection (Gap) +#### 1. No Duplicate Detection (Bug) -Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. The `Done` table is updated atomically with the application state at crank commit. +Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. -We track `highestReceivedSeq` per remote, but there's a gap in how it interacts with delivery: +**Current code behavior** (`RemoteHandle.ts` lines 830-845): +```typescript +// Track received sequence number for piggyback ACK and persist +if (seq > this.#highestReceivedSeq) { + this.#highestReceivedSeq = seq; + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); +} +// ... then UNCONDITIONALLY: +switch (method) { + case 'deliver': + this.#handleRemoteDeliver(params); // Always runs, even for duplicates! +``` -**Scenario A - Update on receive, before delivery:** -1. Receive message seq=5 from remote R -2. Update `highestReceivedSeq` to 5 -3. Add message to run queue for delivery to local vat -4. Crash before delivery crank commits -5. On recovery: `highestReceivedSeq=5` suggests we processed it -6. Remote retransmits seq=5, we ignore it -7. **Message lost** - vat never received it +**Problem**: There is no deduplication check. Even when `seq <= highestReceivedSeq`, the message is processed. After a crash and retransmit, duplicate messages will be delivered to the vat. + +#### 2. Wrong Persistence Order (Bug) -**Scenario B - Update after delivery:** -1. Receive message seq=5, add to run queue -2. Delivery crank runs, vat processes message, crank commits -3. Crash before `highestReceivedSeq` is updated -4. On recovery: `highestReceivedSeq < 5` -5. Remote retransmits seq=5, we deliver again -6. **Duplicate delivery** - vat receives it twice +**Current behavior**: `highestReceivedSeq` is persisted BEFORE the message is processed and added to the run queue. -**What's needed**: A `Done` table (or equivalent) that is updated atomically with the delivery crank commit, and checked before delivering incoming messages. +**Crash scenario**: +1. Receive message seq=5 from remote R +2. Update and persist `highestReceivedSeq` to 5 +3. Crash before message is added to run queue +4. On recovery: `highestReceivedSeq=5` suggests we received it +5. Remote retransmits seq=5, we (correctly) ignore it due to dedup check (once fixed) +6. **Message lost** - never reached the run queue -#### 2. FIFO Enforcement on Receive (Gap) +**What's needed**: Process the message first (add to run queue), then persist `highestReceivedSeq`. Ideally these should be atomic. -Ken enforces per-sender FIFO ordering via `next_ready()` which only delivers the next expected sequence number. +#### 3. FIFO Enforcement on Receive (Not a Gap) -If messages arrive out of order from the network (e.g., seq 1, 3, 2): -- We should deliver seq=1 -- Buffer seq=3 until seq=2 arrives -- Deliver seq=2, then seq=3 +Ken enforces per-sender FIFO ordering via `next_ready()` which only delivers the next expected sequence number. -We don't currently enforce this ordering on the receive side. Out-of-order network delivery could result in out-of-order application delivery. +**Our situation**: We use TCP-based transports (libp2p streams) which guarantee in-order delivery during normal operation. Out-of-order arrival only occurs after a crash when the sender retransmits. With proper deduplication (fix #1 above), retransmitted messages for already-processed sequence numbers will be dropped, maintaining FIFO semantics. -**What's needed**: Track expected next seq per remote sender, buffer out-of-order messages, deliver in sequence order only. +Therefore, explicit receive-side reordering is not required given our transport guarantees. ### Summary Table @@ -131,38 +134,41 @@ We don't currently enforce this ordering on the receive side. Out-of-order netwo | Consistent frontier | **Yes** | Each kernel's checkpoint is independent | | Local recovery | **Yes** | Crashes don't affect other processes | | Sender-based logging | **Yes** | Messages persisted in remotePending until ACKed | -| Exactly-once delivery | **Partial** | At-least-once; missing Done table for deduplication | -| FIFO ordering | **Partial** | Per-sender seq on send; no enforcement on receive | +| Exactly-once delivery | **Bug** | Needs transactional receive with dedup check | +| FIFO ordering | **Yes** | TCP guarantees in-order; dedup handles retransmits | -## What Would Be Needed for Full Ken Properties +## Required Fix -### 1. Add Done Table for Exactly-Once Delivery +Wrap `handleRemoteMessage()` in a database transaction with dedup check: -Track processed messages and deduplicate on receive: +```typescript +handleRemoteMessage(seq, method, params) { + // Begin transaction -**Option A: Per-remote processed sequence tracking** -- Store `highestProcessedSeq.${remoteId}` updated atomically with delivery crank -- On receive: if `seq <= highestProcessedSeq`, ACK but don't deliver -- Simple but requires in-order processing + // Dedup check - must be inside transaction to read committed state + if (seq <= this.#highestReceivedSeq) { + // Already received, ACK but don't process + return; + } -**Option B: Explicit Done table** -- Store `done.${remoteId}.${seq} = true` for each processed message -- On receive: check Done table before delivering -- Supports out-of-order processing -- Requires garbage collection of old entries (after ACK confirms sender discarded) + // Process message (translate refs, add to run queue, etc.) + switch (method) { + case 'deliver': ... + case 'resolve': ... + case 'gc': ... + } -Either approach requires the processed-message record to be updated **atomically with the delivery crank commit**, so that crash recovery sees consistent state. + // Update sequence tracking + this.#highestReceivedSeq = seq; + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); -### 2. FIFO Enforcement on Receive - -Buffer and reorder incoming messages: + // Commit transaction +} +``` -- Track `expectedNextSeq.${remoteId}` per sender -- On receive: if `seq > expectedNextSeq`, buffer the message -- When `expectedNextSeq` message arrives, deliver it and any buffered successors -- Update `expectedNextSeq` as messages are delivered +This achieves atomicity without restructuring the existing message handling code. If a crash occurs before commit, both the run queue entry and the sequence update roll back together - the remote retransmits, and we process it correctly. -This interacts with the Done table: we need to handle the case where we've already processed some messages (from Done table) when determining what's "expected next." +The transaction approach is simpler than reordering because `handleRemoteMessage` handles multiple message types (`deliver`, `resolve`, `gc`) with different processing paths, and reference slots require translation before persistence. ## Architectural Summary @@ -180,19 +186,21 @@ Later (separate operation): The key insight: by the time RemoteHandle sees a message, the originating crank has already committed. Output validity is achieved. -**Receive side (gaps remain):** +**Receive side (bugs to fix):** ``` -Current: - receive from network → add to run queue → deliver to vat - (no deduplication, no ordering enforcement) - -Needed: +Current (buggy): receive from network - → check Done table (skip if already processed) - → check sequence (buffer if out of order) + → persist highestReceivedSeq (WRONG: too early) + → process message unconditionally (WRONG: no dedup) → add to run queue - → deliver to vat - → atomically update Done table with delivery crank commit + +Fixed (wrap in transaction): + receive from network + → begin transaction + → check seq <= highestReceivedSeq (skip if duplicate) + → process message, add to run queue + → persist highestReceivedSeq + → commit transaction ``` ## Progress Summary @@ -204,8 +212,8 @@ Needed: | Atomic kernel state + output queue | **Achieved** | | Output validity (send side) | **Achieved** | | Deferred transmission (send side) | **Achieved** | -| Done table for deduplication (receive side) | Gap | -| FIFO enforcement (receive side) | Gap | +| FIFO ordering | **Achieved** (TCP transport) | +| Exactly-once receive (dedup + atomicity) | **Bug** - needs transactional fix | ## References From 4b4080d305e159b9275c2e0f855be10a19f4a7fd Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 12:54:11 -0800 Subject: [PATCH 02/12] feat: Add receive-side dedup and transactional message processing Implements issue #808: wrap handleRemoteMessage in a database transaction and add duplicate detection to ensure exactly-once delivery semantics. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteHandle.test.ts | 94 +++++++++++++++++++ .../src/remotes/kernel/RemoteHandle.ts | 67 ++++++++----- packages/ocap-kernel/src/store/index.test.ts | 3 + packages/ocap-kernel/src/store/index.ts | 30 ++++++ 4 files changed, 173 insertions(+), 21 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 59896146e..18b5760d1 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -1165,5 +1165,99 @@ describe('RemoteHandle', () => { expect(parsed.seq).toBe(1); // Fresh start expect(parsed.ack).toBeUndefined(); // No highestReceivedSeq }); + + it('ignores duplicate messages (seq <= highestReceivedSeq)', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + // First message with seq=1 - should process + const message1 = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', resolutions], + }); + await remote.handleRemoteMessage(message1); + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(1); + + // Duplicate message with seq=1 - should ignore + const message2 = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', resolutions], + }); + await remote.handleRemoteMessage(message2); + // Should still be 1 call, not 2 + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(1); + + // Message with seq=2 - should process + const message3 = JSON.stringify({ + seq: 2, + method: 'deliver', + params: ['notify', resolutions], + }); + await remote.handleRemoteMessage(message3); + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(2); + }); + + it('persists highestReceivedSeq atomically with message processing', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + const message = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', resolutions], + }); + + await remote.handleRemoteMessage(message); + + // Verify highestReceivedSeq was persisted + expect( + mockKernelStore.getRemoteSeqState(mockRemoteId)?.highestReceivedSeq, + ).toBe(1); + }); + + it('restores highestReceivedSeq on processing error', async () => { + const remote = makeRemote(); + + // First, process a valid message to set highestReceivedSeq to 1 + const validMessage = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', [['rp+3', false, { body: '"value"', slots: [] }]]], + }); + await remote.handleRemoteMessage(validMessage); + expect( + mockKernelStore.getRemoteSeqState(mockRemoteId)?.highestReceivedSeq, + ).toBe(1); + + // Now send a message that will cause an error + const badMessage = JSON.stringify({ + seq: 2, + method: 'deliver', + params: ['bogus'], // Unknown delivery method + }); + + await expect(remote.handleRemoteMessage(badMessage)).rejects.toThrow( + 'unknown remote delivery method bogus', + ); + + // highestReceivedSeq should still be 1 (restored after rollback) + // Send another message with seq=2 to verify it's not considered a duplicate + vi.mocked(mockKernelQueue.resolvePromises).mockClear(); + const retryMessage = JSON.stringify({ + seq: 2, + method: 'deliver', + params: ['notify', [['rp+4', false, { body: '"value2"', slots: [] }]]], + }); + await remote.handleRemoteMessage(retryMessage); + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(1); + }); }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index a8aa71a10..904606fa2 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -826,34 +826,59 @@ export class RemoteHandle implements EndpointHandle { const remoteCommand = parsed as RemoteCommand; const { seq, ack, method, params } = remoteCommand; - // Track received sequence number for piggyback ACK and persist - if (seq > this.#highestReceivedSeq) { - this.#highestReceivedSeq = seq; - this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); + // Handle piggyback ACK if present (outside transaction - ACK processing is idempotent) + if (ack !== undefined) { + this.#handleAck(ack); } // Start delayed ACK timer - will send standalone ACK if no outgoing traffic this.#startDelayedAck(); - // Handle piggyback ACK if present - if (ack !== undefined) { - this.#handleAck(ack); + // Duplicate detection: skip if we've already processed this sequence number + if (seq <= this.#highestReceivedSeq) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: ignoring duplicate message seq=${seq} (highestReceived=${this.#highestReceivedSeq})`, + ); + return ''; } - switch (method) { - case 'deliver': - this.#handleRemoteDeliver(params); - break; - case 'redeemURL': - // Reply is sent via #sendRemoteCommand for proper seq/ack tracking - await this.#handleRedeemURLRequest(...params); - break; - case 'redeemURLReply': - await this.#handleRedeemURLReply(...params); - break; - default: - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - throw Error(`unknown remote message type ${method}`); + // Capture previous seq high watermark for rollback, then update in-memory + // seq tracking early so any reply piggybacking includes correct ACK. + const previousHighestReceivedSeq = this.#highestReceivedSeq; + this.#highestReceivedSeq = seq; + + // Wrap message processing in a transaction for atomicity: Either both (1) + // message processing and (2) seq update succeed together, or neither + // happens. This ensures crash-safe exactly-once delivery. + const savepointName = `receive_${this.remoteId}_${seq}`; + this.#kernelStore.createSavepoint(savepointName); + try { + switch (method) { + case 'deliver': + this.#handleRemoteDeliver(params); + break; + case 'redeemURL': + // Reply is sent via #sendRemoteCommand for proper seq/ack tracking + await this.#handleRedeemURLRequest(...params); + break; + case 'redeemURLReply': + await this.#handleRedeemURLReply(...params); + break; + default: + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw Error(`unknown remote message type ${method}`); + } + + // Persist sequence tracking at the end, within the transaction + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); + + // Commit the transaction + this.#kernelStore.releaseSavepoint(savepointName); + } catch (error) { + // Rollback on any error - also revert in-memory state + this.#highestReceivedSeq = previousHighestReceivedSeq; + this.#kernelStore.rollbackSavepoint(savepointName); + throw error; } return ''; } diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index af0cb55a1..65d21779c 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -54,6 +54,7 @@ describe('kernel store', () => { 'clearReachableFlag', 'collectGarbage', 'createCrankSavepoint', + 'createSavepoint', 'decRefCount', 'decrementRefCount', 'deleteCListEntry', @@ -131,12 +132,14 @@ describe('kernel store', () => { 'pinObject', 'provideIncarnationId', 'releaseAllSavepoints', + 'releaseSavepoint', 'removeVatFromSubcluster', 'reset', 'resolveKernelPromise', 'retireKernelObjects', 'revoke', 'rollbackCrank', + 'rollbackSavepoint', 'runQueueLength', 'scheduleReap', 'setGCActions', diff --git a/packages/ocap-kernel/src/store/index.ts b/packages/ocap-kernel/src/store/index.ts index fa8cc596a..02ec9de7d 100644 --- a/packages/ocap-kernel/src/store/index.ts +++ b/packages/ocap-kernel/src/store/index.ts @@ -252,6 +252,33 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { return newId; } + /** + * Create a savepoint for atomic operations on persistent storage. + * + * @param name - The savepoint name. + */ + function createSavepoint(name: string): void { + kdb.createSavepoint(name); + } + + /** + * Release (commit) a savepoint. + * + * @param name - The savepoint name. + */ + function releaseSavepoint(name: string): void { + kdb.releaseSavepoint(name); + } + + /** + * Rollback to a savepoint. + * + * @param name - The savepoint name. + */ + function rollbackSavepoint(name: string): void { + kdb.rollbackSavepoint(name); + } + return harden({ ...id, ...queue, @@ -273,6 +300,9 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { clear, reset, provideIncarnationId, + createSavepoint, + releaseSavepoint, + rollbackSavepoint, kv, }); } From 60bdc242c9c083c9b332b4dd8a82c44b54d00b21 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 13:29:18 -0800 Subject: [PATCH 03/12] refactor: Change handleRemoteMessage "no response" convention from empty string to null Co-Authored-By: Claude Opus 4.5 --- .../src/PlatformServicesClient.ts | 9 +++++--- .../src/PlatformServicesServer.test.ts | 2 +- .../src/PlatformServicesServer.ts | 7 ++++-- .../nodejs/src/kernel/PlatformServices.ts | 12 +++++++--- .../src/remotes/kernel/RemoteHandle.test.ts | 22 +++++++++---------- .../src/remotes/kernel/RemoteHandle.ts | 10 ++++----- .../src/remotes/kernel/RemoteManager.ts | 7 ++++-- packages/ocap-kernel/src/remotes/types.ts | 2 +- .../rpc/kernel-remote/remoteDeliver.test.ts | 4 ++-- .../src/rpc/kernel-remote/remoteDeliver.ts | 16 ++++++++------ 10 files changed, 54 insertions(+), 37 deletions(-) diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index 165d93a62..eafee559d 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -201,7 +201,10 @@ export class PlatformServicesClient implements PlatformServices { async initializeRemoteComms( keySeed: string, options: RemoteCommsOptions, - remoteMessageHandler: (from: string, message: string) => Promise, + remoteMessageHandler: ( + from: string, + message: string, + ) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, ): Promise { @@ -275,9 +278,9 @@ export class PlatformServicesClient implements PlatformServices { * * @param from - The peer ID that sent the message. * @param message - The message received. - * @returns A promise that resolves with the reply message, or an empty string if no reply is needed. + * @returns A promise that resolves with the reply message, or null if no reply is needed. */ - async #remoteDeliver(from: string, message: string): Promise { + async #remoteDeliver(from: string, message: string): Promise { if (this.#remoteMessageHandler) { return await this.#remoteMessageHandler(from, message); } diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 8336102ce..48f2fc5c7 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -521,7 +521,7 @@ describe('PlatformServicesServer', () => { new MessageEvent('message', { data: { id: 'vws:1', - result: '', + result: null, jsonrpc: '2.0', }, }), diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts index cf0bdea7f..2e1af2b5d 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts @@ -384,9 +384,12 @@ export class PlatformServicesServer { * * @param from - The peer ID that sent the message. * @param message - The message received. - * @returns A promise that resolves with the reply message, or an empty string if no reply is needed. + * @returns A promise that resolves with the reply message, or null if no reply is needed. */ - async #handleRemoteMessage(from: string, message: string): Promise { + async #handleRemoteMessage( + from: string, + message: string, + ): Promise { return this.#rpcClient.call('remoteDeliver', { from, message, diff --git a/packages/nodejs/src/kernel/PlatformServices.ts b/packages/nodejs/src/kernel/PlatformServices.ts index 160dea58c..d33565a3f 100644 --- a/packages/nodejs/src/kernel/PlatformServices.ts +++ b/packages/nodejs/src/kernel/PlatformServices.ts @@ -205,9 +205,12 @@ export class NodejsPlatformServices implements PlatformServices { * * @param from - The peer ID that sent the message. * @param message - The message received. - * @returns A promise that resolves with the reply message, or an empty string if no reply is needed. + * @returns A promise that resolves with the reply message, or null if no reply is needed. */ - async #handleRemoteMessage(from: string, message: string): Promise { + async #handleRemoteMessage( + from: string, + message: string, + ): Promise { if (!this.#remoteMessageHandler) { // This can't actually happen, but TypeScript can't infer it throw Error('remote comms not initialized'); @@ -234,7 +237,10 @@ export class NodejsPlatformServices implements PlatformServices { async initializeRemoteComms( keySeed: string, options: RemoteCommsOptions, - remoteMessageHandler: (from: string, message: string) => Promise, + remoteMessageHandler: ( + from: string, + message: string, + ) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, ): Promise { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 18b5760d1..cf247fc2f 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -262,7 +262,7 @@ describe('RemoteHandle', () => { params: ['message', targetRRef, message], }); const reply = await remote.handleRemoteMessage(delivery); - expect(reply).toBe(''); + expect(reply).toBeNull(); expect(mockKernelQueue.enqueueSend).toHaveBeenCalledWith(targetKRef, { methargs: message.methargs, result: resultKRef, @@ -289,7 +289,7 @@ describe('RemoteHandle', () => { params: ['notify', resolutions], }); const reply = await remote.handleRemoteMessage(notify); - expect(reply).toBe(''); + expect(reply).toBeNull(); expect(mockKernelQueue.resolvePromises).toHaveBeenCalledWith( remote.remoteId, [[promiseKRef, false, { body: '"resolved value"', slots: [] }]], @@ -350,7 +350,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(dropExports); - expect(reply).toBe(''); + expect(reply).toBeNull(); for (const kref of krefs) { const { isPromise } = parseRef(kref); if (isPromise) { @@ -404,7 +404,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(retireExports); - expect(reply).toBe(''); + expect(reply).toBeNull(); expect(mockKernelStore.getObjectRefCount(kref)).toStrictEqual({ reachable: 0, recognizable: 0, @@ -431,7 +431,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(retireImports); - expect(reply).toBe(''); + expect(reply).toBeNull(); // Object should have disappeared from the clists expect(() => @@ -471,7 +471,7 @@ describe('RemoteHandle', () => { mockOcapURL, ); // Reply is now sent via sendRemoteCommand, not returned - expect(reply).toBe(''); + expect(reply).toBeNull(); // Verify reply was sent with seq/ack via sendRemoteMessage expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalled(); const sentMessage = JSON.parse( @@ -510,7 +510,7 @@ describe('RemoteHandle', () => { mockOcapURL, ); // Reply is now sent via sendRemoteCommand, not returned - expect(reply).toBe(''); + expect(reply).toBeNull(); // Verify error reply was sent with seq/ack via sendRemoteMessage expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalled(); const sentMessage = JSON.parse( @@ -890,8 +890,8 @@ describe('RemoteHandle', () => { JSON.stringify(deliveryMessage), ); - // Verify message was processed (handleRemoteMessage returns empty string on success) - expect(result).toBe(''); + // Verify message was processed (handleRemoteMessage returns null on success) + expect(result).toBeNull(); // Verify kernel queue was called expect(mockKernelQueue.resolvePromises).toHaveBeenCalled(); @@ -904,9 +904,9 @@ describe('RemoteHandle', () => { // but wants to acknowledge our messages const standaloneAck = JSON.stringify({ ack: 5 }); - // This should not throw and should return empty string + // This should not throw and should return null const result = await remote.handleRemoteMessage(standaloneAck); - expect(result).toBe(''); + expect(result).toBeNull(); }); it('assigns sequential sequence numbers to outgoing messages', async () => { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 904606fa2..6bc3cddb3 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -812,15 +812,15 @@ export class RemoteHandle implements EndpointHandle { * @param message - The message that was received. * * @returns a string containing a message to send back to the original message - * sender as a response. An empty string means no such message is to be sent. + * sender as a response, or null if no response is to be sent. */ - async handleRemoteMessage(message: string): Promise { + async handleRemoteMessage(message: string): Promise { const parsed = JSON.parse(message); // Handle standalone ACK message (no seq, no method - just ack) if (parsed.ack !== undefined && parsed.seq === undefined) { this.#handleAck(parsed.ack); - return ''; + return null; } const remoteCommand = parsed as RemoteCommand; @@ -839,7 +839,7 @@ export class RemoteHandle implements EndpointHandle { this.#logger.log( `${this.#peerId.slice(0, 8)}:: ignoring duplicate message seq=${seq} (highestReceived=${this.#highestReceivedSeq})`, ); - return ''; + return null; } // Capture previous seq high watermark for rollback, then update in-memory @@ -880,7 +880,7 @@ export class RemoteHandle implements EndpointHandle { this.#kernelStore.rollbackSavepoint(savepointName); throw error; } - return ''; + return null; } /** diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index f2fefef97..b230a43a7 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -295,9 +295,12 @@ export class RemoteManager { * * @param from - The peer ID of the sender. * @param message - The message content. - * @returns a promise for the response message. + * @returns a promise for the response message, or null if no response is needed. */ - async handleRemoteMessage(from: string, message: string): Promise { + async handleRemoteMessage( + from: string, + message: string, + ): Promise { const remote = this.remoteFor(from); return await remote.handleRemoteMessage(message); } diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index f278ff93a..1ee0ce875 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -10,7 +10,7 @@ export type Channel = { export type RemoteMessageHandler = ( from: string, message: string, -) => Promise; +) => Promise; export type SendRemoteMessage = (to: string, message: string) => Promise; diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts index f3110639d..e31c67f9a 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts @@ -11,10 +11,10 @@ describe('remoteDeliver', () => { }); it('should have correct result type', () => { - // Test that result validator accepts strings + // Test that result validator accepts strings and null expect(is('test-result', remoteDeliverSpec.result)).toBe(true); + expect(is(null, remoteDeliverSpec.result)).toBe(true); expect(is(123, remoteDeliverSpec.result)).toBe(false); - expect(is(null, remoteDeliverSpec.result)).toBe(false); expect(is(undefined, remoteDeliverSpec.result)).toBe(false); }); diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts index ad11e6172..9c8c08216 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts @@ -1,6 +1,6 @@ import type { MethodSpec, Handler } from '@metamask/kernel-rpc-methods'; -import { object, string } from '@metamask/superstruct'; -import type { Infer } from '@metamask/superstruct'; +import { object, string, union, literal } from '@metamask/superstruct'; +import type { Infer, Struct } from '@metamask/superstruct'; const paramsStruct = object({ from: string(), @@ -12,19 +12,19 @@ type Params = Infer; export type RemoteDeliverSpec = MethodSpec< 'remoteDeliver', { from: string; message: string }, - string + Promise >; export const remoteDeliverSpec: RemoteDeliverSpec = { method: 'remoteDeliver', params: paramsStruct, - result: string(), + result: union([string(), literal(null)]) as Struct, }; export type HandleRemoteDeliver = ( from: string, message: string, -) => Promise; +) => Promise; type RemoteDeliverHooks = { remoteDeliver: HandleRemoteDeliver; @@ -33,12 +33,14 @@ type RemoteDeliverHooks = { export type RemoteDeliverHandler = Handler< 'remoteDeliver', Params, - Promise, + Promise, RemoteDeliverHooks >; export const remoteDeliverHandler: RemoteDeliverHandler = { - ...remoteDeliverSpec, + method: 'remoteDeliver', + params: paramsStruct, + result: union([string(), literal(null)]) as Struct, hooks: { remoteDeliver: true }, implementation: async ({ remoteDeliver }, params) => { return await remoteDeliver(params.from, params.message); From b74b31c3d770f16d470a8b6b0e88df602424f03c Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 13:34:05 -0800 Subject: [PATCH 04/12] docs: Update Ken protocol assessment - all properties now implemented Co-Authored-By: Claude Opus 4.5 --- docs/ken-protocol-assessment.md | 139 +++++++++++++------------------- 1 file changed, 58 insertions(+), 81 deletions(-) diff --git a/docs/ken-protocol-assessment.md b/docs/ken-protocol-assessment.md index c756a710a..eafd8d33e 100644 --- a/docs/ken-protocol-assessment.md +++ b/docs/ken-protocol-assessment.md @@ -50,6 +50,8 @@ Key aspects: | Deferred transmission | ✓ | Outputs reach RemoteHandle only after originating crank commits | | Output validity | ✓ | Crank buffering ensures outputs escape only after commit | | Atomic checkpoint | ✓ | Database savepoints make crank state changes atomic | +| Exactly-once receive | ✓ | Transactional receive with dedup check (Issue #808) | +| FIFO ordering | ✓ | TCP guarantees in-order; dedup handles retransmits | ### Crank Buffering (Issue #786) @@ -78,48 +80,59 @@ This achieves Ken's property that **outputs are only externalized after successf `RemoteHandle` persists messages to `remotePending` before transmitting for a different reason: to enable retransmission on recovery if the transmission or ACK is lost. This is part of the at-least-once delivery mechanism, not the output validity mechanism. -### Remaining Gaps (Receive Side) +### Receive-Side Implementation (Issue #808) -The remaining gaps are on the **receive side** of remote messaging. Code review of `RemoteHandle.handleRemoteMessage()` revealed specific bugs: +The receive side of remote messaging implements Ken's exactly-once delivery guarantee through transactional message processing with duplicate detection. -#### 1. No Duplicate Detection (Bug) +#### Duplicate Detection -Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. +Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. Our implementation achieves this by checking `seq <= highestReceivedSeq` before processing: -**Current code behavior** (`RemoteHandle.ts` lines 830-845): ```typescript -// Track received sequence number for piggyback ACK and persist -if (seq > this.#highestReceivedSeq) { - this.#highestReceivedSeq = seq; - this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); +// Duplicate detection: skip if we've already processed this sequence number +if (seq <= this.#highestReceivedSeq) { + this.#logger.log(`ignoring duplicate message seq=${seq}`); + return null; } -// ... then UNCONDITIONALLY: -switch (method) { - case 'deliver': - this.#handleRemoteDeliver(params); // Always runs, even for duplicates! ``` -**Problem**: There is no deduplication check. Even when `seq <= highestReceivedSeq`, the message is processed. After a crash and retransmit, duplicate messages will be delivered to the vat. +After a crash and retransmit, duplicate messages are detected and ignored. + +#### Transactional Message Processing -#### 2. Wrong Persistence Order (Bug) +Message processing is wrapped in a database savepoint to ensure atomicity: + +```typescript +const savepointName = `receive_${this.remoteId}_${seq}`; +this.#kernelStore.createSavepoint(savepointName); +try { + // Process message (translate refs, add to run queue, etc.) + switch (method) { + case 'deliver': ... + case 'redeemURL': ... + case 'redeemURLReply': ... + } -**Current behavior**: `highestReceivedSeq` is persisted BEFORE the message is processed and added to the run queue. + // Persist sequence tracking at the end, within the transaction + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); -**Crash scenario**: -1. Receive message seq=5 from remote R -2. Update and persist `highestReceivedSeq` to 5 -3. Crash before message is added to run queue -4. On recovery: `highestReceivedSeq=5` suggests we received it -5. Remote retransmits seq=5, we (correctly) ignore it due to dedup check (once fixed) -6. **Message lost** - never reached the run queue + // Commit the transaction + this.#kernelStore.releaseSavepoint(savepointName); +} catch (error) { + // Rollback on any error - also revert in-memory state + this.#highestReceivedSeq = previousHighestReceivedSeq; + this.#kernelStore.rollbackSavepoint(savepointName); + throw error; +} +``` -**What's needed**: Process the message first (add to run queue), then persist `highestReceivedSeq`. Ideally these should be atomic. +This achieves atomicity: if a crash occurs before commit, both the run queue entry and the sequence update roll back together. The remote retransmits, and we process it correctly. -#### 3. FIFO Enforcement on Receive (Not a Gap) +#### FIFO Ordering Ken enforces per-sender FIFO ordering via `next_ready()` which only delivers the next expected sequence number. -**Our situation**: We use TCP-based transports (libp2p streams) which guarantee in-order delivery during normal operation. Out-of-order arrival only occurs after a crash when the sender retransmits. With proper deduplication (fix #1 above), retransmitted messages for already-processed sequence numbers will be dropped, maintaining FIFO semantics. +**Our situation**: We use TCP-based transports (libp2p streams) which guarantee in-order delivery during normal operation. Out-of-order arrival only occurs after a crash when the sender retransmits. With duplicate detection, retransmitted messages for already-processed sequence numbers are dropped, maintaining FIFO semantics. Therefore, explicit receive-side reordering is not required given our transport guarantees. @@ -134,45 +147,12 @@ Therefore, explicit receive-side reordering is not required given our transport | Consistent frontier | **Yes** | Each kernel's checkpoint is independent | | Local recovery | **Yes** | Crashes don't affect other processes | | Sender-based logging | **Yes** | Messages persisted in remotePending until ACKed | -| Exactly-once delivery | **Bug** | Needs transactional receive with dedup check | +| Exactly-once delivery | **Yes** | Transactional receive with dedup check | | FIFO ordering | **Yes** | TCP guarantees in-order; dedup handles retransmits | -## Required Fix - -Wrap `handleRemoteMessage()` in a database transaction with dedup check: - -```typescript -handleRemoteMessage(seq, method, params) { - // Begin transaction - - // Dedup check - must be inside transaction to read committed state - if (seq <= this.#highestReceivedSeq) { - // Already received, ACK but don't process - return; - } - - // Process message (translate refs, add to run queue, etc.) - switch (method) { - case 'deliver': ... - case 'resolve': ... - case 'gc': ... - } - - // Update sequence tracking - this.#highestReceivedSeq = seq; - this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); - - // Commit transaction -} -``` - -This achieves atomicity without restructuring the existing message handling code. If a crash occurs before commit, both the run queue entry and the sequence update roll back together - the remote retransmits, and we process it correctly. - -The transaction approach is simpler than reordering because `handleRemoteMessage` handles multiple message types (`deliver`, `resolve`, `gc`) with different processing paths, and reference slots require translation before persistence. - ## Architectural Summary -**Send side (achieved with crank buffering):** +**Send side (crank buffering):** ``` Vat Crank: vat processes message → syscalls buffer outputs @@ -186,34 +166,31 @@ Later (separate operation): The key insight: by the time RemoteHandle sees a message, the originating crank has already committed. Output validity is achieved. -**Receive side (bugs to fix):** +**Receive side (transactional processing):** ``` -Current (buggy): - receive from network - → persist highestReceivedSeq (WRONG: too early) - → process message unconditionally (WRONG: no dedup) - → add to run queue - -Fixed (wrap in transaction): - receive from network - → begin transaction - → check seq <= highestReceivedSeq (skip if duplicate) - → process message, add to run queue - → persist highestReceivedSeq - → commit transaction +receive from network + → check seq <= highestReceivedSeq (skip if duplicate) + → begin transaction (savepoint) + → process message, add to run queue + → persist highestReceivedSeq + → commit transaction (release savepoint) ``` +If a crash occurs before commit, both the run queue entry and the sequence update roll back together. The remote retransmits, and we process it correctly. + ## Progress Summary | Area | Status | |------|--------| -| Kernel-internal output buffering | **Achieved** | -| Rollback discards uncommitted outputs | **Achieved** | -| Atomic kernel state + output queue | **Achieved** | -| Output validity (send side) | **Achieved** | -| Deferred transmission (send side) | **Achieved** | +| Kernel-internal output buffering | **Achieved** (Issue #786) | +| Rollback discards uncommitted outputs | **Achieved** (Issue #786) | +| Atomic kernel state + output queue | **Achieved** (Issue #786) | +| Output validity (send side) | **Achieved** (Issue #786) | +| Deferred transmission (send side) | **Achieved** (Issue #786) | | FIFO ordering | **Achieved** (TCP transport) | -| Exactly-once receive (dedup + atomicity) | **Bug** - needs transactional fix | +| Exactly-once receive (dedup + atomicity) | **Achieved** (Issue #808) | + +All Ken protocol properties are now implemented. ## References From 467dc439153e72b86beed8eae60b380b4c12f376 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 14:42:32 -0800 Subject: [PATCH 05/12] fix: Defer ACK piggybacking until after transaction commits - Move #highestReceivedSeq update after savepoint release to prevent ACKs for uncommitted message receipts - Add validation for seq field to prevent state corruption from malformed messages - Update tests to reflect that replies during transaction don't piggyback ACKs Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteHandle.test.ts | 8 ++++++-- .../src/remotes/kernel/RemoteHandle.ts | 18 +++++++++++------- .../src/remotes/kernel/RemoteManager.test.ts | 1 + 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index cf247fc2f..11b42b173 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -480,7 +480,9 @@ describe('RemoteHandle', () => { expect(sentMessage.method).toBe('redeemURLReply'); expect(sentMessage.params).toStrictEqual([true, mockReplyKey, replyRRef]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 - expect(sentMessage.ack).toBe(1); // Piggyback ACK for received message + // ACK is NOT piggybacked on replies sent during transaction - we haven't + // committed yet. The delayed ACK timer will send the ACK after commit. + expect(sentMessage.ack).toBeUndefined(); expect( mockKernelStore.translateRefKtoE(remote.remoteId, replyKRef, false), ).toBe(replyRRef); @@ -523,7 +525,9 @@ describe('RemoteHandle', () => { errorMessage, ]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 - expect(sentMessage.ack).toBe(1); // Piggyback ACK for received message + // ACK is NOT piggybacked on replies sent during transaction - we haven't + // committed yet. The delayed ACK timer will send the ACK after commit. + expect(sentMessage.ack).toBeUndefined(); }); it('handleRemoteMessage rejects bogus message type', async () => { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 6bc3cddb3..b3da92140 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -834,6 +834,11 @@ export class RemoteHandle implements EndpointHandle { // Start delayed ACK timer - will send standalone ACK if no outgoing traffic this.#startDelayedAck(); + // Validate seq value. + if (typeof seq !== 'number' || !Number.isInteger(seq) || seq < 1) { + throw Error(`invalid message seq: ${seq}`); + } + // Duplicate detection: skip if we've already processed this sequence number if (seq <= this.#highestReceivedSeq) { this.#logger.log( @@ -842,11 +847,6 @@ export class RemoteHandle implements EndpointHandle { return null; } - // Capture previous seq high watermark for rollback, then update in-memory - // seq tracking early so any reply piggybacking includes correct ACK. - const previousHighestReceivedSeq = this.#highestReceivedSeq; - this.#highestReceivedSeq = seq; - // Wrap message processing in a transaction for atomicity: Either both (1) // message processing and (2) seq update succeed together, or neither // happens. This ensures crash-safe exactly-once delivery. @@ -874,9 +874,13 @@ export class RemoteHandle implements EndpointHandle { // Commit the transaction this.#kernelStore.releaseSavepoint(savepointName); + + // Update in-memory seq state only after successful commit. This ensures any + // ACK piggybacked on outgoing messages doesn't acknowledge uncommitted + // message receipts. + this.#highestReceivedSeq = seq; } catch (error) { - // Rollback on any error - also revert in-memory state - this.#highestReceivedSeq = previousHighestReceivedSeq; + // Rollback on any error - in-memory state unchanged since we didn't update it yet this.#kernelStore.rollbackSavepoint(savepointName); throw error; } diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 232f758e1..9500a0143 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -352,6 +352,7 @@ describe('RemoteManager', () => { it('creates new remote when handling message from unknown peer', async () => { const message = JSON.stringify({ + seq: 1, method: 'deliver', params: [ 'message', From a76e4351d7343594cbce30366422041f37470209 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 15:01:21 -0800 Subject: [PATCH 06/12] fix: Restart delayed ACK timer after transaction commit When a reply is sent during message processing (e.g., redeemURLReply), #sendRemoteCommand clears the delayed ACK timer. Since the reply can't piggyback the ACK (transaction hasn't committed), we must restart the timer after commit to ensure ACK is eventually sent. Co-Authored-By: Claude Opus 4.5 --- .../ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts | 4 ++-- packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 11b42b173..8649aa717 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -481,7 +481,7 @@ describe('RemoteHandle', () => { expect(sentMessage.params).toStrictEqual([true, mockReplyKey, replyRRef]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 // ACK is NOT piggybacked on replies sent during transaction - we haven't - // committed yet. The delayed ACK timer will send the ACK after commit. + // committed yet. A delayed ACK timer is restarted after commit. expect(sentMessage.ack).toBeUndefined(); expect( mockKernelStore.translateRefKtoE(remote.remoteId, replyKRef, false), @@ -526,7 +526,7 @@ describe('RemoteHandle', () => { ]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 // ACK is NOT piggybacked on replies sent during transaction - we haven't - // committed yet. The delayed ACK timer will send the ACK after commit. + // committed yet. A delayed ACK timer is restarted after commit. expect(sentMessage.ack).toBeUndefined(); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index b3da92140..5f7e9c1a0 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -879,6 +879,13 @@ export class RemoteHandle implements EndpointHandle { // ACK piggybacked on outgoing messages doesn't acknowledge uncommitted // message receipts. this.#highestReceivedSeq = seq; + + // Restart delayed ACK timer. The timer was started at the beginning of + // message processing, but if a reply was sent during the transaction (e.g., + // redeemURLReply), #sendRemoteCommand cleared the timer. Since the reply + // couldn't piggyback the ACK (we hadn't committed yet), we need to ensure + // a standalone ACK is sent. + this.#startDelayedAck(); } catch (error) { // Rollback on any error - in-memory state unchanged since we didn't update it yet this.#kernelStore.rollbackSavepoint(savepointName); From b6f58bdc242bd9140e0f5491b2b977370a189338 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 15:18:55 -0800 Subject: [PATCH 07/12] fix: Defer redeemURLReply in-memory changes until after commit Split #handleRedeemURLReply into #prepareRedeemURLReply (validates and translates inside transaction) and #completeRedeemURLReply (modifies in-memory state after commit). This prevents promise resolution and pendingRedemptions deletion from escaping a rolled-back transaction. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteHandle.ts | 70 ++++++++++++++++--- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 5f7e9c1a0..7dfffe651 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -788,21 +788,55 @@ export class RemoteHandle implements EndpointHandle { * @param replyKey - that tag that was sent in the request being replied to. * @param result - if success, an object ref; if not, an error message string. */ - async #handleRedeemURLReply( + /** + * Prepare to handle a redeemURLReply - validates and translates but does NOT + * modify in-memory state. Returns the data needed to complete the operation. + * + * @param success - Whether the redemption was successful. + * @param replyKey - The reply key for matching to pending redemption. + * @param result - Either the kref (on success) or error message (on failure). + * @returns Data needed to complete the operation after commit. + */ + #prepareRedeemURLReply( success: boolean, replyKey: string, result: string, - ): Promise { - const handlers = this.#pendingRedemptions.get(replyKey); - if (!handlers) { + ): { replyKey: string; success: boolean; value: string } { + // Validate the replyKey exists - if not, this is an error and we should throw + // (which will cause the savepoint to roll back) + if (!this.#pendingRedemptions.has(replyKey)) { throw Error(`unknown URL redemption reply key ${replyKey}`); } - this.#pendingRedemptions.delete(replyKey); - const [resolve, reject] = handlers; - if (success) { - resolve(this.#kernelStore.translateRefEtoK(this.remoteId, result)); - } else { - reject(result); + // Translate ref inside transaction (database operation) + const value = success + ? this.#kernelStore.translateRefEtoK(this.remoteId, result) + : result; + return { replyKey, success, value }; + } + + /** + * Complete a redeemURLReply after transaction commits - modifies in-memory state. + * + * @param data - The data from #prepareRedeemURLReply. + * @param data.replyKey - The reply key for matching to pending redemption. + * @param data.success - Whether the redemption was successful. + * @param data.value - The translated kref (on success) or error message (on failure). + */ + #completeRedeemURLReply(data: { + replyKey: string; + success: boolean; + value: string; + }): void { + const handlers = this.#pendingRedemptions.get(data.replyKey); + // handlers should exist since we validated in prepare, but check for safety + if (handlers) { + this.#pendingRedemptions.delete(data.replyKey); + const [resolve, reject] = handlers; + if (data.success) { + resolve(data.value); + } else { + reject(data.value); + } } } @@ -852,6 +886,12 @@ export class RemoteHandle implements EndpointHandle { // happens. This ensures crash-safe exactly-once delivery. const savepointName = `receive_${this.remoteId}_${seq}`; this.#kernelStore.createSavepoint(savepointName); + + // Deferred operations to complete after commit (for redeemURLReply) + let redemptionResult: + | { replyKey: string; success: boolean; value: string } + | undefined; + try { switch (method) { case 'deliver': @@ -862,7 +902,8 @@ export class RemoteHandle implements EndpointHandle { await this.#handleRedeemURLRequest(...params); break; case 'redeemURLReply': - await this.#handleRedeemURLReply(...params); + // Prepare but don't complete - in-memory changes deferred until after commit + redemptionResult = this.#prepareRedeemURLReply(...params); break; default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions @@ -875,11 +916,18 @@ export class RemoteHandle implements EndpointHandle { // Commit the transaction this.#kernelStore.releaseSavepoint(savepointName); + // === All in-memory state changes happen AFTER commit === + // Update in-memory seq state only after successful commit. This ensures any // ACK piggybacked on outgoing messages doesn't acknowledge uncommitted // message receipts. this.#highestReceivedSeq = seq; + // Complete deferred redeemURLReply (delete from map, resolve/reject promise) + if (redemptionResult) { + this.#completeRedeemURLReply(redemptionResult); + } + // Restart delayed ACK timer. The timer was started at the beginning of // message processing, but if a reply was sent during the transaction (e.g., // redeemURLReply), #sendRemoteCommand cleared the timer. Since the reply From b75d3454dd1664d6df6b5a035188bf6f216b95d7 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 16:26:29 -0800 Subject: [PATCH 08/12] chore: Clean up comments in RemoteHandle Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteHandle.ts | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 7dfffe651..043a562f8 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -782,15 +782,8 @@ export class RemoteHandle implements EndpointHandle { } /** - * Handle an ocap URL redemption reply from the remote end. - * - * @param success - true if the result is a URL, false if the result is an error. - * @param replyKey - that tag that was sent in the request being replied to. - * @param result - if success, an object ref; if not, an error message string. - */ - /** - * Prepare to handle a redeemURLReply - validates and translates but does NOT - * modify in-memory state. Returns the data needed to complete the operation. + * Prepare to handle a redeemURLReply - validates and translates but does not + * yet modify in-memory state. * * @param success - Whether the redemption was successful. * @param replyKey - The reply key for matching to pending redemption. @@ -802,12 +795,9 @@ export class RemoteHandle implements EndpointHandle { replyKey: string, result: string, ): { replyKey: string; success: boolean; value: string } { - // Validate the replyKey exists - if not, this is an error and we should throw - // (which will cause the savepoint to roll back) if (!this.#pendingRedemptions.has(replyKey)) { throw Error(`unknown URL redemption reply key ${replyKey}`); } - // Translate ref inside transaction (database operation) const value = success ? this.#kernelStore.translateRefEtoK(this.remoteId, result) : result; @@ -916,11 +906,10 @@ export class RemoteHandle implements EndpointHandle { // Commit the transaction this.#kernelStore.releaseSavepoint(savepointName); - // === All in-memory state changes happen AFTER commit === + // All in-memory state changes happen after commit - // Update in-memory seq state only after successful commit. This ensures any - // ACK piggybacked on outgoing messages doesn't acknowledge uncommitted - // message receipts. + // Updating in-memory seq state after commit ensures any ACK piggybacked + // on outgoing messages doesn't acknowledge uncommitted message receipts. this.#highestReceivedSeq = seq; // Complete deferred redeemURLReply (delete from map, resolve/reject promise) @@ -928,11 +917,7 @@ export class RemoteHandle implements EndpointHandle { this.#completeRedeemURLReply(redemptionResult); } - // Restart delayed ACK timer. The timer was started at the beginning of - // message processing, but if a reply was sent during the transaction (e.g., - // redeemURLReply), #sendRemoteCommand cleared the timer. Since the reply - // couldn't piggyback the ACK (we hadn't committed yet), we need to ensure - // a standalone ACK is sent. + // Restart delayed ACK timer, which may have been cleared by #sendRemoteCommand. this.#startDelayedAck(); } catch (error) { // Rollback on any error - in-memory state unchanged since we didn't update it yet From 6e51e13abb63d5aeaa419b90d508d96fdd772f1b Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 17:04:54 -0800 Subject: [PATCH 09/12] chore: Remove duplicate remoteDeliverSpec in kernel-remote.ts The kernel-remote.ts file contained a stale duplicate of remoteDeliverSpec that didn't accept null results. Since index.ts imports from remoteDeliver.ts (which has the correct spec), kernel-remote.ts and its test were dead code. Co-Authored-By: Claude Opus 4.5 --- .../rpc/kernel-remote/kernel-remote.test.ts | 76 ------------------- .../src/rpc/kernel-remote/kernel-remote.ts | 17 ----- 2 files changed, 93 deletions(-) delete mode 100644 packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts delete mode 100644 packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts b/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts deleted file mode 100644 index 0ba41ae7f..000000000 --- a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { is } from '@metamask/superstruct'; -import { describe, it, expect } from 'vitest'; - -import { remoteDeliverSpec } from './kernel-remote.ts'; - -describe('kernel-remote', () => { - describe('remoteDeliverSpec', () => { - it('should have correct method name', () => { - expect(remoteDeliverSpec.method).toBe('remoteDeliver'); - }); - - it.each([ - ['test-result', true], - [123, false], - [null, false], - [undefined, false], - ['', true], - ['🌟', true], - ])('should validate result type: %s -> %s', (value, expected) => { - expect(is(value, remoteDeliverSpec.result)).toBe(expected); - }); - - describe('params validation', () => { - it('should accept valid params', () => { - const validParams = { - from: 'peer-123', - message: 'hello world', - }; - - expect(is(validParams, remoteDeliverSpec.params)).toBe(true); - }); - - it.each([ - [{ message: 'hello world' }, 'missing from field'], - [{ from: 'peer-123' }, 'missing message field'], - [{ from: 123, message: 'hello world' }, 'non-string from field'], - [{ from: 'peer-123', message: 123 }, 'non-string message field'], - ])('should reject params with %s', (invalidParams, _description) => { - expect(is(invalidParams, remoteDeliverSpec.params)).toBe(false); - }); - - it('should reject params with extra fields', () => { - const invalidParams = { - from: 'peer-123', - message: 'hello world', - extra: 'field', - }; - - expect(is(invalidParams, remoteDeliverSpec.params)).toBe(false); - }); - - it.each([ - [null, 'null'], - [undefined, 'undefined'], - ['string', 'string'], - [123, 'number'], - [[], 'array'], - ])('should reject %s params', (invalidParams, _type) => { - expect(is(invalidParams, remoteDeliverSpec.params)).toBe(false); - }); - - it.each([ - ['', '', 'empty strings'], - ['🌟peer-123🌟', 'hello 世界 🌍', 'unicode strings'], - ['a'.repeat(10000), 'b'.repeat(10000), 'very long strings'], - ])('should accept %s', (from, message, _description) => { - const validParams = { - from, - message, - }; - - expect(is(validParams, remoteDeliverSpec.params)).toBe(true); - }); - }); - }); -}); diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts b/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts deleted file mode 100644 index 76f93770d..000000000 --- a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { MethodSpec } from '@metamask/kernel-rpc-methods'; -import { object, string } from '@metamask/superstruct'; - -type RemoteDeliverParams = { - from: string; - message: string; -}; - -export const remoteDeliverSpec: MethodSpec< - 'remoteDeliver', - RemoteDeliverParams, - string -> = { - method: 'remoteDeliver', - params: object({ from: string(), message: string() }), - result: string(), -}; From f54a52c53670029d3ee354deeee09cbdedef47ed Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 17:13:22 -0800 Subject: [PATCH 10/12] fix: Use spread pattern in remoteDeliverHandler Spread ...remoteDeliverSpec instead of duplicating method, params, and result fields. This ensures the handler stays in sync with the spec. Co-Authored-By: Claude Opus 4.5 --- packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts index 9c8c08216..79a7f32a3 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts @@ -38,9 +38,7 @@ export type RemoteDeliverHandler = Handler< >; export const remoteDeliverHandler: RemoteDeliverHandler = { - method: 'remoteDeliver', - params: paramsStruct, - result: union([string(), literal(null)]) as Struct, + ...remoteDeliverSpec, hooks: { remoteDeliver: true }, implementation: async ({ remoteDeliver }, params) => { return await remoteDeliver(params.from, params.message); From 2fd367e93979b9f07dcefc040b3e0037b74955f9 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 18:42:23 -0800 Subject: [PATCH 11/12] fix: Defer redeemURL reply until after transaction commits Move #sendRemoteCommand call for redeemURL replies to after the savepoint commits. This prevents in-memory state (#nextSendSeq, #startSeq, timers) from diverging from the database if the transaction rolls back. Also refactor redeemURL and redeemURLReply handling into parallel prepare/complete phases for consistency. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteHandle.test.ts | 10 +-- .../src/remotes/kernel/RemoteHandle.ts | 85 ++++++++++++------- 2 files changed, 60 insertions(+), 35 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 8649aa717..f1cadc366 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -480,9 +480,8 @@ describe('RemoteHandle', () => { expect(sentMessage.method).toBe('redeemURLReply'); expect(sentMessage.params).toStrictEqual([true, mockReplyKey, replyRRef]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 - // ACK is NOT piggybacked on replies sent during transaction - we haven't - // committed yet. A delayed ACK timer is restarted after commit. - expect(sentMessage.ack).toBeUndefined(); + // Reply is sent after commit, so ACK can be piggybacked + expect(sentMessage.ack).toBe(1); expect( mockKernelStore.translateRefKtoE(remote.remoteId, replyKRef, false), ).toBe(replyRRef); @@ -525,9 +524,8 @@ describe('RemoteHandle', () => { errorMessage, ]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 - // ACK is NOT piggybacked on replies sent during transaction - we haven't - // committed yet. A delayed ACK timer is restarted after commit. - expect(sentMessage.ack).toBeUndefined(); + // Reply is sent after commit, so ACK can be piggybacked + expect(sentMessage.ack).toBe(1); }); it('handleRemoteMessage rejects bogus message type', async () => { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 043a562f8..4a3c9cdd1 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -756,65 +756,83 @@ export class RemoteHandle implements EndpointHandle { } /** - * Handle an ocap URL redemption request from the remote end. - * Sends the reply via #sendRemoteCommand to ensure it gets seq/ack tracking. + * Prepare to handle an incoming redeemURL message. Validates and translates + * but does not send the reply. Returns data needed to complete after commit. * * @param url - The ocap URL attempting to be redeemed. * @param replyKey - A sender-provided tag to send with the reply. + * @returns Data needed to complete the operation after commit. */ - async #handleRedeemURLRequest(url: string, replyKey: string): Promise { + async #handleRedeemURLRequest( + url: string, + replyKey: string, + ): Promise<{ success: boolean; replyKey: string; value: string }> { assert.typeof(replyKey, 'string'); let kref: string; try { kref = await this.#remoteComms.redeemLocalOcapURL(url); } catch (error) { - await this.#sendRemoteCommand({ - method: 'redeemURLReply', - params: [false, replyKey, `${(error as Error).message}`], - }); - return; + return { success: false, replyKey, value: `${(error as Error).message}` }; } const eref = this.#kernelStore.translateRefKtoE(this.remoteId, kref, true); + return { success: true, replyKey, value: eref }; + } + + /** + * Complete handling of an incoming redeemURL message by sending the reply. + * + * @param data - The data from #handleRedeemURLRequest. + * @param data.success - Whether the redemption was successful. + * @param data.replyKey - The reply key from the request. + * @param data.value - The eref (on success) or error message (on failure). + */ + async #completeHandleRedeemURLRequest(data: { + success: boolean; + replyKey: string; + value: string; + }): Promise { await this.#sendRemoteCommand({ method: 'redeemURLReply', - params: [true, replyKey, eref], + params: [data.success, data.replyKey, data.value], }); } /** - * Prepare to handle a redeemURLReply - validates and translates but does not - * yet modify in-memory state. + * Prepare to handle an incoming redeemURLReply message. Validates and + * translates but does not modify in-memory state. Returns data needed to + * complete after commit. * * @param success - Whether the redemption was successful. * @param replyKey - The reply key for matching to pending redemption. * @param result - Either the kref (on success) or error message (on failure). * @returns Data needed to complete the operation after commit. */ - #prepareRedeemURLReply( + #handleRedeemURLReply( success: boolean, replyKey: string, result: string, - ): { replyKey: string; success: boolean; value: string } { + ): { success: boolean; replyKey: string; value: string } { if (!this.#pendingRedemptions.has(replyKey)) { throw Error(`unknown URL redemption reply key ${replyKey}`); } const value = success ? this.#kernelStore.translateRefEtoK(this.remoteId, result) : result; - return { replyKey, success, value }; + return { success, replyKey, value }; } /** - * Complete a redeemURLReply after transaction commits - modifies in-memory state. + * Complete handling of an incoming redeemURLReply message by resolving the + * pending promise. * - * @param data - The data from #prepareRedeemURLReply. - * @param data.replyKey - The reply key for matching to pending redemption. + * @param data - The data from #handleRedeemURLReply. * @param data.success - Whether the redemption was successful. + * @param data.replyKey - The reply key for matching to pending redemption. * @param data.value - The translated kref (on success) or error message (on failure). */ - #completeRedeemURLReply(data: { - replyKey: string; + #completeHandleRedeemURLReply(data: { success: boolean; + replyKey: string; value: string; }): void { const handlers = this.#pendingRedemptions.get(data.replyKey); @@ -877,9 +895,9 @@ export class RemoteHandle implements EndpointHandle { const savepointName = `receive_${this.remoteId}_${seq}`; this.#kernelStore.createSavepoint(savepointName); - // Deferred operations to complete after commit (for redeemURLReply) - let redemptionResult: - | { replyKey: string; success: boolean; value: string } + // Deferred completion data - set by redeemURL and redeemURLReply handlers + let deferredCompletion: + | { success: boolean; replyKey: string; value: string } | undefined; try { @@ -888,12 +906,10 @@ export class RemoteHandle implements EndpointHandle { this.#handleRemoteDeliver(params); break; case 'redeemURL': - // Reply is sent via #sendRemoteCommand for proper seq/ack tracking - await this.#handleRedeemURLRequest(...params); + deferredCompletion = await this.#handleRedeemURLRequest(...params); break; case 'redeemURLReply': - // Prepare but don't complete - in-memory changes deferred until after commit - redemptionResult = this.#prepareRedeemURLReply(...params); + deferredCompletion = this.#handleRedeemURLReply(...params); break; default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions @@ -912,9 +928,20 @@ export class RemoteHandle implements EndpointHandle { // on outgoing messages doesn't acknowledge uncommitted message receipts. this.#highestReceivedSeq = seq; - // Complete deferred redeemURLReply (delete from map, resolve/reject promise) - if (redemptionResult) { - this.#completeRedeemURLReply(redemptionResult); + // Complete deferred operations + if (deferredCompletion) { + switch (method) { + case 'redeemURL': + await this.#completeHandleRedeemURLRequest(deferredCompletion); + break; + case 'redeemURLReply': + this.#completeHandleRedeemURLReply(deferredCompletion); + break; + case 'deliver': + default: + // deliver doesn't set deferredCompletion, so this is unreachable + break; + } } // Restart delayed ACK timer, which may have been cleared by #sendRemoteCommand. From 210ff110503736880d9356f987d9ee2a0c4f8a9a Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 5 Feb 2026 23:28:06 -0800 Subject: [PATCH 12/12] fix: Move post-commit operations outside try-catch Post-commit operations (deferred completions, in-memory state updates) now execute outside the try-catch block. This prevents attempting to rollback an already-released savepoint if a completion operation fails. Also adds exemptFromCapacityLimit parameter to #sendRemoteCommand so redeemURLReply can bypass capacity checks - it's a kernel-initiated reply that must be sent to avoid leaving the remote hanging. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/kernel/RemoteHandle.ts | 72 +++++++++++-------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 4a3c9cdd1..1a3317312 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -470,9 +470,13 @@ export class RemoteHandle implements EndpointHandle { * Adds seq and ack fields, queues for ACK tracking, and sends. * * @param messageBase - The base message to send (without seq/ack). + * @param exemptFromCapacityLimit - If true, bypass the pending queue capacity + * check. Used for kernel initiated messages that must be sent to avoid + * leaving the remote hanging. */ async #sendRemoteCommand( messageBase: Delivery | RedeemURLRequest | RedeemURLReply, + exemptFromCapacityLimit = false, ): Promise { if (this.#needsHinting) { // Hints are registered lazily because (a) transmitting to the platform @@ -494,8 +498,11 @@ export class RemoteHandle implements EndpointHandle { this.#needsHinting = false; } - // Check queue capacity before consuming any resources (seq number, ACK timer) - if (this.#getPendingCount() >= MAX_PENDING_MESSAGES) { + // Check queue capacity before consuming any resources (seq number, ACK timer). + if ( + !exemptFromCapacityLimit && + this.#getPendingCount() >= MAX_PENDING_MESSAGES + ) { throw Error( `Message rejected: pending queue at capacity (${MAX_PENDING_MESSAGES})`, ); @@ -791,10 +798,13 @@ export class RemoteHandle implements EndpointHandle { replyKey: string; value: string; }): Promise { - await this.#sendRemoteCommand({ - method: 'redeemURLReply', - params: [data.success, data.replyKey, data.value], - }); + await this.#sendRemoteCommand( + { + method: 'redeemURLReply', + params: [data.success, data.replyKey, data.value], + }, + true, // exempt from capacity limit - this a reply that mustn't fail and is not vat-initiated + ); } /** @@ -921,36 +931,36 @@ export class RemoteHandle implements EndpointHandle { // Commit the transaction this.#kernelStore.releaseSavepoint(savepointName); - - // All in-memory state changes happen after commit - - // Updating in-memory seq state after commit ensures any ACK piggybacked - // on outgoing messages doesn't acknowledge uncommitted message receipts. - this.#highestReceivedSeq = seq; - - // Complete deferred operations - if (deferredCompletion) { - switch (method) { - case 'redeemURL': - await this.#completeHandleRedeemURLRequest(deferredCompletion); - break; - case 'redeemURLReply': - this.#completeHandleRedeemURLReply(deferredCompletion); - break; - case 'deliver': - default: - // deliver doesn't set deferredCompletion, so this is unreachable - break; - } - } - - // Restart delayed ACK timer, which may have been cleared by #sendRemoteCommand. - this.#startDelayedAck(); } catch (error) { // Rollback on any error - in-memory state unchanged since we didn't update it yet this.#kernelStore.rollbackSavepoint(savepointName); throw error; } + + // All in-memory state changes happen after commit + + // Updating in-memory seq state after commit ensures any ACK piggybacked + // on outgoing messages doesn't acknowledge uncommitted message receipts. + this.#highestReceivedSeq = seq; + + // Complete deferred operations + if (deferredCompletion) { + switch (method) { + case 'redeemURL': + await this.#completeHandleRedeemURLRequest(deferredCompletion); + break; + case 'redeemURLReply': + this.#completeHandleRedeemURLReply(deferredCompletion); + break; + case 'deliver': + default: + // deliver doesn't set deferredCompletion, so this is unreachable + break; + } + } + + // Restart delayed ACK timer, which may have been cleared by #sendRemoteCommand. + this.#startDelayedAck(); return null; }