diff --git a/docs/ken-protocol-assessment.md b/docs/ken-protocol-assessment.md index 5b5c72e54..eafd8d33e 100644 --- a/docs/ken-protocol-assessment.md +++ b/docs/ken-protocol-assessment.md @@ -46,10 +46,12 @@ Key aspects: | Retransmission | ✓ | Timeout-based retransmit until ACK or max retries | | Crash-safe persistence | ✓ | Write message first, then update nextSendSeq | | Local recovery | ✓ | Restore seq state, restart ACK timeout | -| **Transactional turns** | ✓ | Crank buffering defers outputs until crank commit | -| **Deferred transmission** | ✓ | Outputs reach RemoteHandle only after originating crank commits | -| **Output validity** | ✓ | Crank buffering ensures outputs escape only after commit | -| **Atomic checkpoint** | ✓ | Database savepoints make crank state changes atomic | +| Transactional turns | ✓ | Crank buffering defers outputs until crank commit | +| Deferred transmission | ✓ | Outputs reach RemoteHandle only after originating crank commits | +| Output validity | ✓ | Crank buffering ensures outputs escape only after commit | +| Atomic checkpoint | ✓ | Database savepoints make crank state changes atomic | +| Exactly-once receive | ✓ | Transactional receive with dedup check (Issue #808) | +| FIFO ordering | ✓ | TCP guarantees in-order; dedup handles retransmits | ### Crank Buffering (Issue #786) @@ -78,47 +80,61 @@ This achieves Ken's property that **outputs are only externalized after successf `RemoteHandle` persists messages to `remotePending` before transmitting for a different reason: to enable retransmission on recovery if the transmission or ACK is lost. This is part of the at-least-once delivery mechanism, not the output validity mechanism. -### Remaining Gaps (Receive Side) +### Receive-Side Implementation (Issue #808) -The remaining gaps are on the **receive side** of remote messaging: +The receive side of remote messaging implements Ken's exactly-once delivery guarantee through transactional message processing with duplicate detection. -#### 1. Done Table / Duplicate Detection (Gap) +#### Duplicate Detection -Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. The `Done` table is updated atomically with the application state at crank commit. +Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. Our implementation achieves this by checking `seq <= highestReceivedSeq` before processing: -We track `highestReceivedSeq` per remote, but there's a gap in how it interacts with delivery: - -**Scenario A - Update on receive, before delivery:** -1. Receive message seq=5 from remote R -2. Update `highestReceivedSeq` to 5 -3. Add message to run queue for delivery to local vat -4. Crash before delivery crank commits -5. On recovery: `highestReceivedSeq=5` suggests we processed it -6. Remote retransmits seq=5, we ignore it -7. **Message lost** - vat never received it +```typescript +// Duplicate detection: skip if we've already processed this sequence number +if (seq <= this.#highestReceivedSeq) { + this.#logger.log(`ignoring duplicate message seq=${seq}`); + return null; +} +``` -**Scenario B - Update after delivery:** -1. Receive message seq=5, add to run queue -2. Delivery crank runs, vat processes message, crank commits -3. Crash before `highestReceivedSeq` is updated -4. On recovery: `highestReceivedSeq < 5` -5. Remote retransmits seq=5, we deliver again -6. **Duplicate delivery** - vat receives it twice +After a crash and retransmit, duplicate messages are detected and ignored. + +#### Transactional Message Processing + +Message processing is wrapped in a database savepoint to ensure atomicity: + +```typescript +const savepointName = `receive_${this.remoteId}_${seq}`; +this.#kernelStore.createSavepoint(savepointName); +try { + // Process message (translate refs, add to run queue, etc.) + switch (method) { + case 'deliver': ... + case 'redeemURL': ... + case 'redeemURLReply': ... + } + + // Persist sequence tracking at the end, within the transaction + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); + + // Commit the transaction + this.#kernelStore.releaseSavepoint(savepointName); +} catch (error) { + // Rollback on any error - also revert in-memory state + this.#highestReceivedSeq = previousHighestReceivedSeq; + this.#kernelStore.rollbackSavepoint(savepointName); + throw error; +} +``` -**What's needed**: A `Done` table (or equivalent) that is updated atomically with the delivery crank commit, and checked before delivering incoming messages. +This achieves atomicity: if a crash occurs before commit, both the run queue entry and the sequence update roll back together. The remote retransmits, and we process it correctly. -#### 2. FIFO Enforcement on Receive (Gap) +#### FIFO Ordering Ken enforces per-sender FIFO ordering via `next_ready()` which only delivers the next expected sequence number. -If messages arrive out of order from the network (e.g., seq 1, 3, 2): -- We should deliver seq=1 -- Buffer seq=3 until seq=2 arrives -- Deliver seq=2, then seq=3 - -We don't currently enforce this ordering on the receive side. Out-of-order network delivery could result in out-of-order application delivery. +**Our situation**: We use TCP-based transports (libp2p streams) which guarantee in-order delivery during normal operation. Out-of-order arrival only occurs after a crash when the sender retransmits. With duplicate detection, retransmitted messages for already-processed sequence numbers are dropped, maintaining FIFO semantics. -**What's needed**: Track expected next seq per remote sender, buffer out-of-order messages, deliver in sequence order only. +Therefore, explicit receive-side reordering is not required given our transport guarantees. ### Summary Table @@ -131,42 +147,12 @@ We don't currently enforce this ordering on the receive side. Out-of-order netwo | Consistent frontier | **Yes** | Each kernel's checkpoint is independent | | Local recovery | **Yes** | Crashes don't affect other processes | | Sender-based logging | **Yes** | Messages persisted in remotePending until ACKed | -| Exactly-once delivery | **Partial** | At-least-once; missing Done table for deduplication | -| FIFO ordering | **Partial** | Per-sender seq on send; no enforcement on receive | - -## What Would Be Needed for Full Ken Properties - -### 1. Add Done Table for Exactly-Once Delivery - -Track processed messages and deduplicate on receive: - -**Option A: Per-remote processed sequence tracking** -- Store `highestProcessedSeq.${remoteId}` updated atomically with delivery crank -- On receive: if `seq <= highestProcessedSeq`, ACK but don't deliver -- Simple but requires in-order processing - -**Option B: Explicit Done table** -- Store `done.${remoteId}.${seq} = true` for each processed message -- On receive: check Done table before delivering -- Supports out-of-order processing -- Requires garbage collection of old entries (after ACK confirms sender discarded) - -Either approach requires the processed-message record to be updated **atomically with the delivery crank commit**, so that crash recovery sees consistent state. - -### 2. FIFO Enforcement on Receive - -Buffer and reorder incoming messages: - -- Track `expectedNextSeq.${remoteId}` per sender -- On receive: if `seq > expectedNextSeq`, buffer the message -- When `expectedNextSeq` message arrives, deliver it and any buffered successors -- Update `expectedNextSeq` as messages are delivered - -This interacts with the Done table: we need to handle the case where we've already processed some messages (from Done table) when determining what's "expected next." +| Exactly-once delivery | **Yes** | Transactional receive with dedup check | +| FIFO ordering | **Yes** | TCP guarantees in-order; dedup handles retransmits | ## Architectural Summary -**Send side (achieved with crank buffering):** +**Send side (crank buffering):** ``` Vat Crank: vat processes message → syscalls buffer outputs @@ -180,32 +166,31 @@ Later (separate operation): The key insight: by the time RemoteHandle sees a message, the originating crank has already committed. Output validity is achieved. -**Receive side (gaps remain):** +**Receive side (transactional processing):** ``` -Current: - receive from network → add to run queue → deliver to vat - (no deduplication, no ordering enforcement) - -Needed: - receive from network - → check Done table (skip if already processed) - → check sequence (buffer if out of order) - → add to run queue - → deliver to vat - → atomically update Done table with delivery crank commit +receive from network + → check seq <= highestReceivedSeq (skip if duplicate) + → begin transaction (savepoint) + → process message, add to run queue + → persist highestReceivedSeq + → commit transaction (release savepoint) ``` +If a crash occurs before commit, both the run queue entry and the sequence update roll back together. The remote retransmits, and we process it correctly. + ## Progress Summary | Area | Status | |------|--------| -| Kernel-internal output buffering | **Achieved** | -| Rollback discards uncommitted outputs | **Achieved** | -| Atomic kernel state + output queue | **Achieved** | -| Output validity (send side) | **Achieved** | -| Deferred transmission (send side) | **Achieved** | -| Done table for deduplication (receive side) | Gap | -| FIFO enforcement (receive side) | Gap | +| Kernel-internal output buffering | **Achieved** (Issue #786) | +| Rollback discards uncommitted outputs | **Achieved** (Issue #786) | +| Atomic kernel state + output queue | **Achieved** (Issue #786) | +| Output validity (send side) | **Achieved** (Issue #786) | +| Deferred transmission (send side) | **Achieved** (Issue #786) | +| FIFO ordering | **Achieved** (TCP transport) | +| Exactly-once receive (dedup + atomicity) | **Achieved** (Issue #808) | + +All Ken protocol properties are now implemented. ## References diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index 165d93a62..eafee559d 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -201,7 +201,10 @@ export class PlatformServicesClient implements PlatformServices { async initializeRemoteComms( keySeed: string, options: RemoteCommsOptions, - remoteMessageHandler: (from: string, message: string) => Promise, + remoteMessageHandler: ( + from: string, + message: string, + ) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, ): Promise { @@ -275,9 +278,9 @@ export class PlatformServicesClient implements PlatformServices { * * @param from - The peer ID that sent the message. * @param message - The message received. - * @returns A promise that resolves with the reply message, or an empty string if no reply is needed. + * @returns A promise that resolves with the reply message, or null if no reply is needed. */ - async #remoteDeliver(from: string, message: string): Promise { + async #remoteDeliver(from: string, message: string): Promise { if (this.#remoteMessageHandler) { return await this.#remoteMessageHandler(from, message); } diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 8336102ce..48f2fc5c7 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -521,7 +521,7 @@ describe('PlatformServicesServer', () => { new MessageEvent('message', { data: { id: 'vws:1', - result: '', + result: null, jsonrpc: '2.0', }, }), diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts index cf0bdea7f..2e1af2b5d 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts @@ -384,9 +384,12 @@ export class PlatformServicesServer { * * @param from - The peer ID that sent the message. * @param message - The message received. - * @returns A promise that resolves with the reply message, or an empty string if no reply is needed. + * @returns A promise that resolves with the reply message, or null if no reply is needed. */ - async #handleRemoteMessage(from: string, message: string): Promise { + async #handleRemoteMessage( + from: string, + message: string, + ): Promise { return this.#rpcClient.call('remoteDeliver', { from, message, diff --git a/packages/nodejs/src/kernel/PlatformServices.ts b/packages/nodejs/src/kernel/PlatformServices.ts index 160dea58c..d33565a3f 100644 --- a/packages/nodejs/src/kernel/PlatformServices.ts +++ b/packages/nodejs/src/kernel/PlatformServices.ts @@ -205,9 +205,12 @@ export class NodejsPlatformServices implements PlatformServices { * * @param from - The peer ID that sent the message. * @param message - The message received. - * @returns A promise that resolves with the reply message, or an empty string if no reply is needed. + * @returns A promise that resolves with the reply message, or null if no reply is needed. */ - async #handleRemoteMessage(from: string, message: string): Promise { + async #handleRemoteMessage( + from: string, + message: string, + ): Promise { if (!this.#remoteMessageHandler) { // This can't actually happen, but TypeScript can't infer it throw Error('remote comms not initialized'); @@ -234,7 +237,10 @@ export class NodejsPlatformServices implements PlatformServices { async initializeRemoteComms( keySeed: string, options: RemoteCommsOptions, - remoteMessageHandler: (from: string, message: string) => Promise, + remoteMessageHandler: ( + from: string, + message: string, + ) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, ): Promise { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 59896146e..f1cadc366 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -262,7 +262,7 @@ describe('RemoteHandle', () => { params: ['message', targetRRef, message], }); const reply = await remote.handleRemoteMessage(delivery); - expect(reply).toBe(''); + expect(reply).toBeNull(); expect(mockKernelQueue.enqueueSend).toHaveBeenCalledWith(targetKRef, { methargs: message.methargs, result: resultKRef, @@ -289,7 +289,7 @@ describe('RemoteHandle', () => { params: ['notify', resolutions], }); const reply = await remote.handleRemoteMessage(notify); - expect(reply).toBe(''); + expect(reply).toBeNull(); expect(mockKernelQueue.resolvePromises).toHaveBeenCalledWith( remote.remoteId, [[promiseKRef, false, { body: '"resolved value"', slots: [] }]], @@ -350,7 +350,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(dropExports); - expect(reply).toBe(''); + expect(reply).toBeNull(); for (const kref of krefs) { const { isPromise } = parseRef(kref); if (isPromise) { @@ -404,7 +404,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(retireExports); - expect(reply).toBe(''); + expect(reply).toBeNull(); expect(mockKernelStore.getObjectRefCount(kref)).toStrictEqual({ reachable: 0, recognizable: 0, @@ -431,7 +431,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(retireImports); - expect(reply).toBe(''); + expect(reply).toBeNull(); // Object should have disappeared from the clists expect(() => @@ -471,7 +471,7 @@ describe('RemoteHandle', () => { mockOcapURL, ); // Reply is now sent via sendRemoteCommand, not returned - expect(reply).toBe(''); + expect(reply).toBeNull(); // Verify reply was sent with seq/ack via sendRemoteMessage expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalled(); const sentMessage = JSON.parse( @@ -480,7 +480,8 @@ describe('RemoteHandle', () => { expect(sentMessage.method).toBe('redeemURLReply'); expect(sentMessage.params).toStrictEqual([true, mockReplyKey, replyRRef]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 - expect(sentMessage.ack).toBe(1); // Piggyback ACK for received message + // Reply is sent after commit, so ACK can be piggybacked + expect(sentMessage.ack).toBe(1); expect( mockKernelStore.translateRefKtoE(remote.remoteId, replyKRef, false), ).toBe(replyRRef); @@ -510,7 +511,7 @@ describe('RemoteHandle', () => { mockOcapURL, ); // Reply is now sent via sendRemoteCommand, not returned - expect(reply).toBe(''); + expect(reply).toBeNull(); // Verify error reply was sent with seq/ack via sendRemoteMessage expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalled(); const sentMessage = JSON.parse( @@ -523,7 +524,8 @@ describe('RemoteHandle', () => { errorMessage, ]); expect(sentMessage.seq).toBe(1); // First outgoing message gets seq 1 - expect(sentMessage.ack).toBe(1); // Piggyback ACK for received message + // Reply is sent after commit, so ACK can be piggybacked + expect(sentMessage.ack).toBe(1); }); it('handleRemoteMessage rejects bogus message type', async () => { @@ -890,8 +892,8 @@ describe('RemoteHandle', () => { JSON.stringify(deliveryMessage), ); - // Verify message was processed (handleRemoteMessage returns empty string on success) - expect(result).toBe(''); + // Verify message was processed (handleRemoteMessage returns null on success) + expect(result).toBeNull(); // Verify kernel queue was called expect(mockKernelQueue.resolvePromises).toHaveBeenCalled(); @@ -904,9 +906,9 @@ describe('RemoteHandle', () => { // but wants to acknowledge our messages const standaloneAck = JSON.stringify({ ack: 5 }); - // This should not throw and should return empty string + // This should not throw and should return null const result = await remote.handleRemoteMessage(standaloneAck); - expect(result).toBe(''); + expect(result).toBeNull(); }); it('assigns sequential sequence numbers to outgoing messages', async () => { @@ -1165,5 +1167,99 @@ describe('RemoteHandle', () => { expect(parsed.seq).toBe(1); // Fresh start expect(parsed.ack).toBeUndefined(); // No highestReceivedSeq }); + + it('ignores duplicate messages (seq <= highestReceivedSeq)', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + // First message with seq=1 - should process + const message1 = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', resolutions], + }); + await remote.handleRemoteMessage(message1); + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(1); + + // Duplicate message with seq=1 - should ignore + const message2 = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', resolutions], + }); + await remote.handleRemoteMessage(message2); + // Should still be 1 call, not 2 + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(1); + + // Message with seq=2 - should process + const message3 = JSON.stringify({ + seq: 2, + method: 'deliver', + params: ['notify', resolutions], + }); + await remote.handleRemoteMessage(message3); + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(2); + }); + + it('persists highestReceivedSeq atomically with message processing', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + const message = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', resolutions], + }); + + await remote.handleRemoteMessage(message); + + // Verify highestReceivedSeq was persisted + expect( + mockKernelStore.getRemoteSeqState(mockRemoteId)?.highestReceivedSeq, + ).toBe(1); + }); + + it('restores highestReceivedSeq on processing error', async () => { + const remote = makeRemote(); + + // First, process a valid message to set highestReceivedSeq to 1 + const validMessage = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['notify', [['rp+3', false, { body: '"value"', slots: [] }]]], + }); + await remote.handleRemoteMessage(validMessage); + expect( + mockKernelStore.getRemoteSeqState(mockRemoteId)?.highestReceivedSeq, + ).toBe(1); + + // Now send a message that will cause an error + const badMessage = JSON.stringify({ + seq: 2, + method: 'deliver', + params: ['bogus'], // Unknown delivery method + }); + + await expect(remote.handleRemoteMessage(badMessage)).rejects.toThrow( + 'unknown remote delivery method bogus', + ); + + // highestReceivedSeq should still be 1 (restored after rollback) + // Send another message with seq=2 to verify it's not considered a duplicate + vi.mocked(mockKernelQueue.resolvePromises).mockClear(); + const retryMessage = JSON.stringify({ + seq: 2, + method: 'deliver', + params: ['notify', [['rp+4', false, { body: '"value2"', slots: [] }]]], + }); + await remote.handleRemoteMessage(retryMessage); + expect(mockKernelQueue.resolvePromises).toHaveBeenCalledTimes(1); + }); }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index a8aa71a10..1a3317312 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -470,9 +470,13 @@ export class RemoteHandle implements EndpointHandle { * Adds seq and ack fields, queues for ACK tracking, and sends. * * @param messageBase - The base message to send (without seq/ack). + * @param exemptFromCapacityLimit - If true, bypass the pending queue capacity + * check. Used for kernel initiated messages that must be sent to avoid + * leaving the remote hanging. */ async #sendRemoteCommand( messageBase: Delivery | RedeemURLRequest | RedeemURLReply, + exemptFromCapacityLimit = false, ): Promise { if (this.#needsHinting) { // Hints are registered lazily because (a) transmitting to the platform @@ -494,8 +498,11 @@ export class RemoteHandle implements EndpointHandle { this.#needsHinting = false; } - // Check queue capacity before consuming any resources (seq number, ACK timer) - if (this.#getPendingCount() >= MAX_PENDING_MESSAGES) { + // Check queue capacity before consuming any resources (seq number, ACK timer). + if ( + !exemptFromCapacityLimit && + this.#getPendingCount() >= MAX_PENDING_MESSAGES + ) { throw Error( `Message rejected: pending queue at capacity (${MAX_PENDING_MESSAGES})`, ); @@ -756,53 +763,98 @@ export class RemoteHandle implements EndpointHandle { } /** - * Handle an ocap URL redemption request from the remote end. - * Sends the reply via #sendRemoteCommand to ensure it gets seq/ack tracking. + * Prepare to handle an incoming redeemURL message. Validates and translates + * but does not send the reply. Returns data needed to complete after commit. * * @param url - The ocap URL attempting to be redeemed. * @param replyKey - A sender-provided tag to send with the reply. + * @returns Data needed to complete the operation after commit. */ - async #handleRedeemURLRequest(url: string, replyKey: string): Promise { + async #handleRedeemURLRequest( + url: string, + replyKey: string, + ): Promise<{ success: boolean; replyKey: string; value: string }> { assert.typeof(replyKey, 'string'); let kref: string; try { kref = await this.#remoteComms.redeemLocalOcapURL(url); } catch (error) { - await this.#sendRemoteCommand({ - method: 'redeemURLReply', - params: [false, replyKey, `${(error as Error).message}`], - }); - return; + return { success: false, replyKey, value: `${(error as Error).message}` }; } const eref = this.#kernelStore.translateRefKtoE(this.remoteId, kref, true); - await this.#sendRemoteCommand({ - method: 'redeemURLReply', - params: [true, replyKey, eref], - }); + return { success: true, replyKey, value: eref }; + } + + /** + * Complete handling of an incoming redeemURL message by sending the reply. + * + * @param data - The data from #handleRedeemURLRequest. + * @param data.success - Whether the redemption was successful. + * @param data.replyKey - The reply key from the request. + * @param data.value - The eref (on success) or error message (on failure). + */ + async #completeHandleRedeemURLRequest(data: { + success: boolean; + replyKey: string; + value: string; + }): Promise { + await this.#sendRemoteCommand( + { + method: 'redeemURLReply', + params: [data.success, data.replyKey, data.value], + }, + true, // exempt from capacity limit - this a reply that mustn't fail and is not vat-initiated + ); } /** - * Handle an ocap URL redemption reply from the remote end. + * Prepare to handle an incoming redeemURLReply message. Validates and + * translates but does not modify in-memory state. Returns data needed to + * complete after commit. * - * @param success - true if the result is a URL, false if the result is an error. - * @param replyKey - that tag that was sent in the request being replied to. - * @param result - if success, an object ref; if not, an error message string. + * @param success - Whether the redemption was successful. + * @param replyKey - The reply key for matching to pending redemption. + * @param result - Either the kref (on success) or error message (on failure). + * @returns Data needed to complete the operation after commit. */ - async #handleRedeemURLReply( + #handleRedeemURLReply( success: boolean, replyKey: string, result: string, - ): Promise { - const handlers = this.#pendingRedemptions.get(replyKey); - if (!handlers) { + ): { success: boolean; replyKey: string; value: string } { + if (!this.#pendingRedemptions.has(replyKey)) { throw Error(`unknown URL redemption reply key ${replyKey}`); } - this.#pendingRedemptions.delete(replyKey); - const [resolve, reject] = handlers; - if (success) { - resolve(this.#kernelStore.translateRefEtoK(this.remoteId, result)); - } else { - reject(result); + const value = success + ? this.#kernelStore.translateRefEtoK(this.remoteId, result) + : result; + return { success, replyKey, value }; + } + + /** + * Complete handling of an incoming redeemURLReply message by resolving the + * pending promise. + * + * @param data - The data from #handleRedeemURLReply. + * @param data.success - Whether the redemption was successful. + * @param data.replyKey - The reply key for matching to pending redemption. + * @param data.value - The translated kref (on success) or error message (on failure). + */ + #completeHandleRedeemURLReply(data: { + success: boolean; + replyKey: string; + value: string; + }): void { + const handlers = this.#pendingRedemptions.get(data.replyKey); + // handlers should exist since we validated in prepare, but check for safety + if (handlers) { + this.#pendingRedemptions.delete(data.replyKey); + const [resolve, reject] = handlers; + if (data.success) { + resolve(data.value); + } else { + reject(data.value); + } } } @@ -812,50 +864,104 @@ export class RemoteHandle implements EndpointHandle { * @param message - The message that was received. * * @returns a string containing a message to send back to the original message - * sender as a response. An empty string means no such message is to be sent. + * sender as a response, or null if no response is to be sent. */ - async handleRemoteMessage(message: string): Promise { + async handleRemoteMessage(message: string): Promise { const parsed = JSON.parse(message); // Handle standalone ACK message (no seq, no method - just ack) if (parsed.ack !== undefined && parsed.seq === undefined) { this.#handleAck(parsed.ack); - return ''; + return null; } const remoteCommand = parsed as RemoteCommand; const { seq, ack, method, params } = remoteCommand; - // Track received sequence number for piggyback ACK and persist - if (seq > this.#highestReceivedSeq) { - this.#highestReceivedSeq = seq; - this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); + // Handle piggyback ACK if present (outside transaction - ACK processing is idempotent) + if (ack !== undefined) { + this.#handleAck(ack); } // Start delayed ACK timer - will send standalone ACK if no outgoing traffic this.#startDelayedAck(); - // Handle piggyback ACK if present - if (ack !== undefined) { - this.#handleAck(ack); + // Validate seq value. + if (typeof seq !== 'number' || !Number.isInteger(seq) || seq < 1) { + throw Error(`invalid message seq: ${seq}`); } - switch (method) { - case 'deliver': - this.#handleRemoteDeliver(params); - break; - case 'redeemURL': - // Reply is sent via #sendRemoteCommand for proper seq/ack tracking - await this.#handleRedeemURLRequest(...params); - break; - case 'redeemURLReply': - await this.#handleRedeemURLReply(...params); - break; - default: - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - throw Error(`unknown remote message type ${method}`); + // Duplicate detection: skip if we've already processed this sequence number + if (seq <= this.#highestReceivedSeq) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: ignoring duplicate message seq=${seq} (highestReceived=${this.#highestReceivedSeq})`, + ); + return null; + } + + // Wrap message processing in a transaction for atomicity: Either both (1) + // message processing and (2) seq update succeed together, or neither + // happens. This ensures crash-safe exactly-once delivery. + const savepointName = `receive_${this.remoteId}_${seq}`; + this.#kernelStore.createSavepoint(savepointName); + + // Deferred completion data - set by redeemURL and redeemURLReply handlers + let deferredCompletion: + | { success: boolean; replyKey: string; value: string } + | undefined; + + try { + switch (method) { + case 'deliver': + this.#handleRemoteDeliver(params); + break; + case 'redeemURL': + deferredCompletion = await this.#handleRedeemURLRequest(...params); + break; + case 'redeemURLReply': + deferredCompletion = this.#handleRedeemURLReply(...params); + break; + default: + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw Error(`unknown remote message type ${method}`); + } + + // Persist sequence tracking at the end, within the transaction + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); + + // Commit the transaction + this.#kernelStore.releaseSavepoint(savepointName); + } catch (error) { + // Rollback on any error - in-memory state unchanged since we didn't update it yet + this.#kernelStore.rollbackSavepoint(savepointName); + throw error; + } + + // All in-memory state changes happen after commit + + // Updating in-memory seq state after commit ensures any ACK piggybacked + // on outgoing messages doesn't acknowledge uncommitted message receipts. + this.#highestReceivedSeq = seq; + + // Complete deferred operations + if (deferredCompletion) { + switch (method) { + case 'redeemURL': + await this.#completeHandleRedeemURLRequest(deferredCompletion); + break; + case 'redeemURLReply': + this.#completeHandleRedeemURLReply(deferredCompletion); + break; + case 'deliver': + default: + // deliver doesn't set deferredCompletion, so this is unreachable + break; + } } - return ''; + + // Restart delayed ACK timer, which may have been cleared by #sendRemoteCommand. + this.#startDelayedAck(); + return null; } /** diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 232f758e1..9500a0143 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -352,6 +352,7 @@ describe('RemoteManager', () => { it('creates new remote when handling message from unknown peer', async () => { const message = JSON.stringify({ + seq: 1, method: 'deliver', params: [ 'message', diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index f2fefef97..b230a43a7 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -295,9 +295,12 @@ export class RemoteManager { * * @param from - The peer ID of the sender. * @param message - The message content. - * @returns a promise for the response message. + * @returns a promise for the response message, or null if no response is needed. */ - async handleRemoteMessage(from: string, message: string): Promise { + async handleRemoteMessage( + from: string, + message: string, + ): Promise { const remote = this.remoteFor(from); return await remote.handleRemoteMessage(message); } diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index f278ff93a..1ee0ce875 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -10,7 +10,7 @@ export type Channel = { export type RemoteMessageHandler = ( from: string, message: string, -) => Promise; +) => Promise; export type SendRemoteMessage = (to: string, message: string) => Promise; diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts b/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts deleted file mode 100644 index 0ba41ae7f..000000000 --- a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.test.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { is } from '@metamask/superstruct'; -import { describe, it, expect } from 'vitest'; - -import { remoteDeliverSpec } from './kernel-remote.ts'; - -describe('kernel-remote', () => { - describe('remoteDeliverSpec', () => { - it('should have correct method name', () => { - expect(remoteDeliverSpec.method).toBe('remoteDeliver'); - }); - - it.each([ - ['test-result', true], - [123, false], - [null, false], - [undefined, false], - ['', true], - ['🌟', true], - ])('should validate result type: %s -> %s', (value, expected) => { - expect(is(value, remoteDeliverSpec.result)).toBe(expected); - }); - - describe('params validation', () => { - it('should accept valid params', () => { - const validParams = { - from: 'peer-123', - message: 'hello world', - }; - - expect(is(validParams, remoteDeliverSpec.params)).toBe(true); - }); - - it.each([ - [{ message: 'hello world' }, 'missing from field'], - [{ from: 'peer-123' }, 'missing message field'], - [{ from: 123, message: 'hello world' }, 'non-string from field'], - [{ from: 'peer-123', message: 123 }, 'non-string message field'], - ])('should reject params with %s', (invalidParams, _description) => { - expect(is(invalidParams, remoteDeliverSpec.params)).toBe(false); - }); - - it('should reject params with extra fields', () => { - const invalidParams = { - from: 'peer-123', - message: 'hello world', - extra: 'field', - }; - - expect(is(invalidParams, remoteDeliverSpec.params)).toBe(false); - }); - - it.each([ - [null, 'null'], - [undefined, 'undefined'], - ['string', 'string'], - [123, 'number'], - [[], 'array'], - ])('should reject %s params', (invalidParams, _type) => { - expect(is(invalidParams, remoteDeliverSpec.params)).toBe(false); - }); - - it.each([ - ['', '', 'empty strings'], - ['🌟peer-123🌟', 'hello 世界 🌍', 'unicode strings'], - ['a'.repeat(10000), 'b'.repeat(10000), 'very long strings'], - ])('should accept %s', (from, message, _description) => { - const validParams = { - from, - message, - }; - - expect(is(validParams, remoteDeliverSpec.params)).toBe(true); - }); - }); - }); -}); diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts b/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts deleted file mode 100644 index 76f93770d..000000000 --- a/packages/ocap-kernel/src/rpc/kernel-remote/kernel-remote.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { MethodSpec } from '@metamask/kernel-rpc-methods'; -import { object, string } from '@metamask/superstruct'; - -type RemoteDeliverParams = { - from: string; - message: string; -}; - -export const remoteDeliverSpec: MethodSpec< - 'remoteDeliver', - RemoteDeliverParams, - string -> = { - method: 'remoteDeliver', - params: object({ from: string(), message: string() }), - result: string(), -}; diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts index f3110639d..e31c67f9a 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.test.ts @@ -11,10 +11,10 @@ describe('remoteDeliver', () => { }); it('should have correct result type', () => { - // Test that result validator accepts strings + // Test that result validator accepts strings and null expect(is('test-result', remoteDeliverSpec.result)).toBe(true); + expect(is(null, remoteDeliverSpec.result)).toBe(true); expect(is(123, remoteDeliverSpec.result)).toBe(false); - expect(is(null, remoteDeliverSpec.result)).toBe(false); expect(is(undefined, remoteDeliverSpec.result)).toBe(false); }); diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts index ad11e6172..79a7f32a3 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteDeliver.ts @@ -1,6 +1,6 @@ import type { MethodSpec, Handler } from '@metamask/kernel-rpc-methods'; -import { object, string } from '@metamask/superstruct'; -import type { Infer } from '@metamask/superstruct'; +import { object, string, union, literal } from '@metamask/superstruct'; +import type { Infer, Struct } from '@metamask/superstruct'; const paramsStruct = object({ from: string(), @@ -12,19 +12,19 @@ type Params = Infer; export type RemoteDeliverSpec = MethodSpec< 'remoteDeliver', { from: string; message: string }, - string + Promise >; export const remoteDeliverSpec: RemoteDeliverSpec = { method: 'remoteDeliver', params: paramsStruct, - result: string(), + result: union([string(), literal(null)]) as Struct, }; export type HandleRemoteDeliver = ( from: string, message: string, -) => Promise; +) => Promise; type RemoteDeliverHooks = { remoteDeliver: HandleRemoteDeliver; @@ -33,7 +33,7 @@ type RemoteDeliverHooks = { export type RemoteDeliverHandler = Handler< 'remoteDeliver', Params, - Promise, + Promise, RemoteDeliverHooks >; diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index af0cb55a1..65d21779c 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -54,6 +54,7 @@ describe('kernel store', () => { 'clearReachableFlag', 'collectGarbage', 'createCrankSavepoint', + 'createSavepoint', 'decRefCount', 'decrementRefCount', 'deleteCListEntry', @@ -131,12 +132,14 @@ describe('kernel store', () => { 'pinObject', 'provideIncarnationId', 'releaseAllSavepoints', + 'releaseSavepoint', 'removeVatFromSubcluster', 'reset', 'resolveKernelPromise', 'retireKernelObjects', 'revoke', 'rollbackCrank', + 'rollbackSavepoint', 'runQueueLength', 'scheduleReap', 'setGCActions', diff --git a/packages/ocap-kernel/src/store/index.ts b/packages/ocap-kernel/src/store/index.ts index fa8cc596a..02ec9de7d 100644 --- a/packages/ocap-kernel/src/store/index.ts +++ b/packages/ocap-kernel/src/store/index.ts @@ -252,6 +252,33 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { return newId; } + /** + * Create a savepoint for atomic operations on persistent storage. + * + * @param name - The savepoint name. + */ + function createSavepoint(name: string): void { + kdb.createSavepoint(name); + } + + /** + * Release (commit) a savepoint. + * + * @param name - The savepoint name. + */ + function releaseSavepoint(name: string): void { + kdb.releaseSavepoint(name); + } + + /** + * Rollback to a savepoint. + * + * @param name - The savepoint name. + */ + function rollbackSavepoint(name: string): void { + kdb.rollbackSavepoint(name); + } + return harden({ ...id, ...queue, @@ -273,6 +300,9 @@ export function makeKernelStore(kdb: KernelDatabase, logger?: Logger) { clear, reset, provideIncarnationId, + createSavepoint, + releaseSavepoint, + rollbackSavepoint, kv, }); }