From a327b1f77ff13f1198505fcf403f10824928e3ed Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 6 Feb 2026 17:15:09 +0100 Subject: [PATCH 1/3] feat(ocap-kernel): implement distributed garbage collection protocol (#779) Implement the DGC protocol so that `deliverBringOutYourDead()` on a RemoteHandle sends a BOYD wire message to the remote kernel, which schedules a local reap, runs GC, and sends back drops/retires. - Widen `scheduleReap` to accept `EndpointId` (vat or remote) - Update GC action parsing to use `insistEndpointId` - Add `bringOutYourDead` delivery type to RemoteHandle - Add ping-pong prevention flag to avoid infinite BOYD loops - Add unit tests for BOYD protocol and GC endpoint widening - Add e2e tests for distributed GC in remote-comms Co-Authored-By: Claude Opus 4.6 --- packages/nodejs/test/e2e/remote-comms.test.ts | 173 ++++++++++++++++++ .../garbage-collection/garbage-collection.ts | 77 ++++---- .../src/remotes/kernel/RemoteHandle.test.ts | 135 +++++++++++++- .../src/remotes/kernel/RemoteHandle.ts | 34 +++- .../ocap-kernel/src/store/methods/gc.test.ts | 72 +++++++- packages/ocap-kernel/src/store/methods/gc.ts | 17 +- 6 files changed, 453 insertions(+), 55 deletions(-) diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 104a05e99..16bbcd09d 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -1,5 +1,6 @@ import type { Libp2p } from '@libp2p/interface'; import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { waitUntilQuiescent } from '@metamask/kernel-utils'; import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; import type { KRef } from '@metamask/ocap-kernel'; import { startRelay } from '@ocap/cli/relay'; @@ -1021,4 +1022,176 @@ describe.sequential('Remote Communications E2E', () => { NETWORK_TIMEOUT * 3, ); }); + + describe('Distributed Garbage Collection', () => { + it( + 'creates remote endpoint with clist entries after cross-kernel message', + async () => { + const { aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send a message to create cross-kernel object references + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + + // Verify remote endpoint was allocated on kernel1 + const nextRemoteId1 = Number( + kernelStore1.kv.get('nextRemoteId') ?? '0', + ); + expect(nextRemoteId1).toBeGreaterThan(0); + + // Verify remote endpoint was allocated on kernel2 + const nextRemoteId2 = Number( + kernelStore2.kv.get('nextRemoteId') ?? '0', + ); + expect(nextRemoteId2).toBeGreaterThan(0); + }, + NETWORK_TIMEOUT, + ); + + it( + 'sends BOYD to remote kernel when local remote is reaped', + async () => { + const { aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send a message to create cross-kernel refs + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + + // Schedule reap on kernel1's remote endpoint (r0) - this will cause + // the crank loop to deliver BOYD to the remote kernel + kernelStore1.scheduleReap('r0'); + + // Trigger cranks to process the reap action (which sends BOYD to kernel2) + // and allow the remote to process it and respond + for (let i = 0; i < 3; i++) { + await kernel1.queueMessage(aliceRef, 'ping', []); + await waitUntilQuiescent(500); + } + + // Verify communication still works after DGC + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(response).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'processes incoming BOYD by scheduling local reap', + async () => { + const { bobRef, aliceURL, aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send messages in both directions to create refs on both sides + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']); + + // Schedule reap on kernel2's remote endpoint - this will send BOYD to kernel1 + kernelStore2.scheduleReap('r0'); + + // Trigger cranks to process the reap and allow BOYD to flow + for (let i = 0; i < 3; i++) { + await kernel2.queueMessage(bobRef, 'ping', []); + await waitUntilQuiescent(500); + } + + // Verify communication still works after DGC from both directions + const aliceToBob = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(aliceToBob).toContain('vat Bob got "hello" from Alice'); + + const bobToAlice = await sendRemoteMessage( + kernel2, + bobRef, + aliceURL, + 'hello', + ['Bob'], + ); + expect(bobToAlice).toContain('vat Alice got "hello" from Bob'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'completes BOYD exchange without infinite ping-pong', + async () => { + const { aliceRef, bobRef, bobURL, aliceURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send messages to establish refs on both sides + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']); + + // Schedule reap on BOTH sides simultaneously - this tests that the + // ping-pong prevention flag works correctly, preventing infinite BOYD loops + kernelStore1.scheduleReap('r0'); + kernelStore2.scheduleReap('r0'); + + // Trigger cranks on both kernels to process the reaps and allow + // BOYD messages to flow in both directions + for (let i = 0; i < 3; i++) { + await Promise.all([ + kernel1.queueMessage(aliceRef, 'ping', []), + kernel2.queueMessage(bobRef, 'ping', []), + ]); + await waitUntilQuiescent(500); + } + + // No pending GC actions on either side after DGC completes + expect(kernelStore1.getGCActions().size).toBe(0); + expect(kernelStore2.getGCActions().size).toBe(0); + + // Verify continued bidirectional communication works - this proves + // the BOYD exchange completed without breaking the connection + const aliceToBob = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(aliceToBob).toContain('vat Bob got "hello" from Alice'); + + const bobToAlice = await sendRemoteMessage( + kernel2, + bobRef, + aliceURL, + 'hello', + ['Bob'], + ); + expect(bobToAlice).toContain('vat Alice got "hello" from Bob'); + }, + NETWORK_TIMEOUT, + ); + }); }); diff --git a/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts b/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts index 8b5b9aa9b..86f769934 100644 --- a/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts +++ b/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts @@ -3,14 +3,14 @@ import { insistKernelType } from '../store/utils/kernel-slots.ts'; import type { GCAction, GCActionType, + EndpointId, KRef, RunQueueItem, - VatId, } from '../types.ts'; import { actionTypePriorities, insistGCActionType, - insistVatId, + insistEndpointId, queueTypeFromActionType, } from '../types.ts'; import { assert } from '../utils/assert.ts'; @@ -19,43 +19,43 @@ import { assert } from '../utils/assert.ts'; * Parsed representation of a GC action. */ type ParsedGCAction = Readonly<{ - vatId: VatId; + endpointId: EndpointId; type: GCActionType; kref: KRef; }>; /** - * Parse a GC action string into a vat id, type, and kref. + * Parse a GC action string into an endpoint id, type, and kref. * * @param action - The GC action string to parse. * @returns The parsed GC action. */ function parseAction(action: GCAction): ParsedGCAction { - const [vatId, type, kref] = action.split(' '); - insistVatId(vatId); + const [endpointId, type, kref] = action.split(' '); + insistEndpointId(endpointId); insistGCActionType(type); insistKernelType('object', kref); - return harden({ vatId, type, kref }); + return harden({ endpointId, type, kref }); } /** * Determines if a GC action should be processed based on current system state. * * @param storage - The kernel storage. - * @param vatId - The vat id of the vat that owns the kref. + * @param endpointId - The endpoint id of the vat or remote that owns the kref. * @param type - The type of GC action. * @param kref - The kref of the object in question. * @returns True if the action should be processed, false otherwise. */ function shouldProcessAction( storage: KernelStore, - vatId: VatId, + endpointId: EndpointId, type: GCActionType, kref: KRef, ): boolean { - const hasCList = storage.hasCListEntry(vatId, kref); + const hasCList = storage.hasCListEntry(endpointId, kref); const isReachable = hasCList - ? storage.getReachableFlag(vatId, kref) + ? storage.getReachableFlag(endpointId, kref) : undefined; const exists = storage.kernelRefExists(kref); const { reachable, recognizable } = exists @@ -78,17 +78,17 @@ function shouldProcessAction( } /** - * Filters and processes a group of GC actions for a specific vat and action type. + * Filters and processes a group of GC actions for a specific endpoint and action type. * * @param storage - The kernel storage. - * @param vatId - The vat id of the vat that owns the krefs. + * @param endpointId - The endpoint id of the vat or remote that owns the krefs. * @param actions - The set of GC actions to process. * @param allActionsSet - The complete set of GC actions. * @returns Object containing the krefs to process and whether the action set was updated. */ function filterActionsForProcessing( storage: KernelStore, - vatId: VatId, + endpointId: EndpointId, actions: Set, allActionsSet: Set, ): { krefs: KRef[]; actionSetUpdated: boolean } { @@ -97,7 +97,7 @@ function filterActionsForProcessing( for (const action of actions) { const { type, kref } = parseAction(action); - if (shouldProcessAction(storage, vatId, type, kref)) { + if (shouldProcessAction(storage, endpointId, type, kref)) { krefs.push(kref); } allActionsSet.delete(action); @@ -119,43 +119,52 @@ export function processGCActionSet( const allActionsSet = storage.getGCActions(); let actionSetUpdated = false; - // Group actions by vat and type - const actionsByVat = new Map>>(); + // Group actions by endpoint and type + const actionsByEndpoint = new Map< + EndpointId, + Map> + >(); for (const action of allActionsSet) { - const { vatId, type } = parseAction(action); + const { endpointId, type } = parseAction(action); - if (!actionsByVat.has(vatId)) { - actionsByVat.set(vatId, new Map()); + if (!actionsByEndpoint.has(endpointId)) { + actionsByEndpoint.set(endpointId, new Map()); } - const actionsForVatByType = actionsByVat.get(vatId); - assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`); + const actionsForEndpointByType = actionsByEndpoint.get(endpointId); + assert( + actionsForEndpointByType !== undefined, + `No actions for endpoint: ${endpointId}`, + ); - if (!actionsForVatByType.has(type)) { - actionsForVatByType.set(type, new Set()); + if (!actionsForEndpointByType.has(type)) { + actionsForEndpointByType.set(type, new Set()); } - const actions = actionsForVatByType.get(type); + const actions = actionsForEndpointByType.get(type); assert(actions !== undefined, `No actions for type: ${type}`); actions.add(action); } // Process actions in priority order - const vatIds = Array.from(actionsByVat.keys()).sort(); + const endpointIds = Array.from(actionsByEndpoint.keys()).sort(); - for (const vatId of vatIds) { - const actionsForVatByType = actionsByVat.get(vatId); - assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`); + for (const endpointId of endpointIds) { + const actionsForEndpointByType = actionsByEndpoint.get(endpointId); + assert( + actionsForEndpointByType !== undefined, + `No actions for endpoint: ${endpointId}`, + ); - // Find the highest-priority type of work to do within this vat + // Find the highest-priority type of work to do within this endpoint for (const type of actionTypePriorities) { - if (actionsForVatByType.has(type)) { - const actions = actionsForVatByType.get(type); + if (actionsForEndpointByType.has(type)) { + const actions = actionsForEndpointByType.get(type); assert(actions !== undefined, `No actions for type: ${type}`); const { krefs, actionSetUpdated: updated } = filterActionsForProcessing( storage, - vatId, + endpointId, actions, allActionsSet, ); @@ -172,7 +181,7 @@ export function processGCActionSet( const queueType = queueTypeFromActionType.get(type); assert(queueType !== undefined, `Unknown action type: ${type}`); - return harden({ type: queueType, endpointId: vatId, krefs }); + return harden({ type: queueType, endpointId, krefs }); } } } diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 9e91647fb..b2aaa0e97 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -153,12 +153,137 @@ describe('RemoteHandle', () => { expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); }); - it('deliverBringOutYourDead does not call sendRemoteMessage', async () => { - const remote = makeRemote(); + describe('bringOutYourDead', () => { + it('sends BOYD delivery to remote when locally triggered', async () => { + const remote = makeRemote(); - const crankResult = await remote.deliverBringOutYourDead(); - expect(mockRemoteComms.sendRemoteMessage).not.toHaveBeenCalled(); - expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); + const crankResult = await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledWith( + mockRemotePeerId, + expect.any(String), + ); + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]![1]; + const parsed = JSON.parse(sentString); + expect(parsed.seq).toBe(1); + expect(parsed.method).toBe('deliver'); + expect(parsed.params).toStrictEqual(['bringOutYourDead']); + expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); + }); + + it('handles incoming BOYD by scheduling reap', async () => { + const remote = makeRemote(); + + const delivery = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }); + const reply = await remote.handleRemoteMessage(delivery); + + expect(reply).toBe(''); + // Verify reap was scheduled by checking the reap queue + expect(mockKernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: remote.remoteId, + }); + }); + + it('does not send BOYD back when remotely triggered (ping-pong prevention)', async () => { + const remote = makeRemote(); + + // Receive BOYD from remote + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }), + ); + + // Now local kernel calls deliverBringOutYourDead - should NOT send back + const crankResult = await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).not.toHaveBeenCalled(); + expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); + }); + + it('clears flag after skipping echo (next local BOYD sends normally)', async () => { + const remote = makeRemote(); + + // Receive BOYD from remote + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }), + ); + + // First local BOYD - suppressed + await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).not.toHaveBeenCalled(); + + // Second local BOYD - should send normally (flag was cleared) + await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledWith( + mockRemotePeerId, + expect.any(String), + ); + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]![1]; + const parsed = JSON.parse(sentString); + expect(parsed.seq).toBe(1); + expect(parsed.method).toBe('deliver'); + expect(parsed.params).toStrictEqual(['bringOutYourDead']); + }); + + it('tracks correct seq/ack on BOYD messages', async () => { + const remote = makeRemote(); + + // Receive a non-BOYD message to set up ack tracking + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 3, + method: 'deliver', + params: ['notify', resolutions], + }), + ); + + // Send a non-BOYD message first to consume seq 1 + await remote.deliverNotify([ + ['rp+1', false, { body: '"value"', slots: [] }], + ]); + + // Now send BOYD - should get seq 2 with ack 3 + await remote.deliverBringOutYourDead(); + + const { calls } = vi.mocked(mockRemoteComms.sendRemoteMessage).mock; + // Second call is the BOYD (first was the notify) + const parsed = JSON.parse(calls[1]![1]); + expect(parsed.seq).toBe(2); + expect(parsed.ack).toBe(3); + expect(parsed.method).toBe('deliver'); + expect(parsed.params).toStrictEqual(['bringOutYourDead']); + }); + + it('persists BOYD message for retransmission', async () => { + const remote = makeRemote(); + + await remote.deliverBringOutYourDead(); + + // Verify message was persisted + const pendingMsgString = mockKernelStore.getPendingMessage( + mockRemoteId, + 1, + ); + expect(pendingMsgString).toBeDefined(); + expect(pendingMsgString).toContain('"bringOutYourDead"'); + expect(pendingMsgString).toContain('"seq":1'); + }); }); it('redeemOcapURL calls sendRemoteMessage correctly and handles expected reply (success)', async () => { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index b7d3dfa98..cc3b32f38 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -47,13 +47,15 @@ type NotifyDelivery = ['notify', VatOneResolution[]]; type DropExportsDelivery = ['dropExports', string[]]; type RetireExportsDelivery = ['retireExports', string[]]; type RetireImportsDelivery = ['retireImports', string[]]; +type BringOutYourDeadDelivery = ['bringOutYourDead']; type DeliveryParams = | MessageDelivery | NotifyDelivery | DropExportsDelivery | RetireExportsDelivery - | RetireImportsDelivery; + | RetireImportsDelivery + | BringOutYourDeadDelivery; type Delivery = { method: 'deliver'; @@ -102,6 +104,13 @@ export class RemoteHandle implements EndpointHandle { /** Flag that location hints need to be sent to remote comms object. */ #needsHinting: boolean = true; + /** + * Flag indicating the current BOYD was triggered by an incoming remote + * request. When set, deliverBringOutYourDead will skip sending BOYD back + * to the remote, preventing infinite ping-pong. + */ + #remoteGcRequested: boolean = false; + /** Pending URL redemption requests that have not yet been responded to. */ readonly #pendingRedemptions: Map< string, @@ -635,15 +644,21 @@ export class RemoteHandle implements EndpointHandle { } /** - * Make a 'bringOutYourDead' delivery to the remote. - * - * Currently this does not actually do anything but is included to satisfy the - * EndpointHandle interface. + * Send a 'bringOutYourDead' delivery to the remote, requesting it to run + * its garbage collection cycle. If the current BOYD was triggered by an + * incoming remote request, skip sending to prevent infinite ping-pong. * * @returns the crank results. */ async deliverBringOutYourDead(): Promise { - // XXX Currently a no-op, but probably some further DGC action is warranted here + if (this.#remoteGcRequested) { + this.#remoteGcRequested = false; + return this.#myCrankResult; + } + await this.#sendRemoteCommand({ + method: 'deliver', + params: ['bringOutYourDead'], + }); return this.#myCrankResult; } @@ -756,6 +771,13 @@ export class RemoteHandle implements EndpointHandle { this.#retireImports(erefs); break; } + case 'bringOutYourDead': { + // TODO: After merging #811, move this assignment after the savepoint + // commit, consistent with the transactional message processing pattern. + this.#remoteGcRequested = true; + this.#kernelStore.scheduleReap(this.remoteId); + break; + } default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions throw Error(`unknown remote delivery method ${method}`); diff --git a/packages/ocap-kernel/src/store/methods/gc.test.ts b/packages/ocap-kernel/src/store/methods/gc.test.ts index b8a12f196..ad4586340 100644 --- a/packages/ocap-kernel/src/store/methods/gc.test.ts +++ b/packages/ocap-kernel/src/store/methods/gc.test.ts @@ -30,13 +30,29 @@ describe('GC methods', () => { expect(actions).toStrictEqual(new Set(validActions)); }); + it('manages GC actions for remote endpoints', () => { + const r0Object = kernelStore.initKernelObject('r0'); + const r1Object = kernelStore.initKernelObject('r1'); + + const remoteActions: GCAction[] = [ + `r0 dropExport ${r0Object}`, + `r1 retireExport ${r1Object}`, + ]; + + kernelStore.addGCActions(remoteActions); + + const actions = kernelStore.getGCActions(); + expect(actions.size).toBe(2); + expect(actions).toStrictEqual(new Set(remoteActions)); + }); + it('rejects invalid GC actions', () => { const v1Object = kernelStore.initKernelObject('v1'); - // Invalid vat ID + // Invalid endpoint ID expect(() => { kernelStore.addGCActions([`x1 dropExport ${v1Object}`]); - }).toThrow('not a valid VatId'); + }).toThrow('not a valid EndpointId'); // Invalid action type expect(() => { @@ -131,6 +147,58 @@ describe('GC methods', () => { expect(kernelStore.nextReapAction()).toBeUndefined(); }); + + it('schedules remote IDs for reaping', () => { + kernelStore.scheduleReap('r0'); + kernelStore.scheduleReap('r1'); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r0', + }); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r1', + }); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + + it('interleaves vat and remote reap scheduling', () => { + kernelStore.scheduleReap('v1'); + kernelStore.scheduleReap('r0'); + kernelStore.scheduleReap('v2'); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'v1', + }); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r0', + }); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'v2', + }); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + + it('handles duplicate remote reap scheduling', () => { + kernelStore.scheduleReap('r0'); + kernelStore.scheduleReap('r0'); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r0', + }); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); }); describe('retireKernelObjects', () => { diff --git a/packages/ocap-kernel/src/store/methods/gc.ts b/packages/ocap-kernel/src/store/methods/gc.ts index 6d518bc1a..e95a2cb67 100644 --- a/packages/ocap-kernel/src/store/methods/gc.ts +++ b/packages/ocap-kernel/src/store/methods/gc.ts @@ -9,11 +9,12 @@ import { getSubclusterMethods } from './subclusters.ts'; import { getVatMethods } from './vat.ts'; import type { VatId, + EndpointId, KRef, GCAction, RunQueueItemBringOutYourDead, } from '../../types.ts'; -import { insistGCActionType, insistVatId } from '../../types.ts'; +import { insistGCActionType, insistEndpointId } from '../../types.ts'; import type { StoreContext } from '../types.ts'; import { insistKernelType, parseKernelSlot } from '../utils/kernel-slots.ts'; @@ -62,8 +63,8 @@ export function getGCMethods(ctx: StoreContext) { const actions = getGCActions(); for (const action of newActions) { assert.typeof(action, 'string', 'addGCActions given bad action'); - const [vatId, type, kref] = action.split(' '); - insistVatId(vatId); + const [endpointId, type, kref] = action.split(' '); + insistEndpointId(endpointId); insistGCActionType(type); insistKernelType('object', kref); actions.add(action); @@ -72,14 +73,14 @@ export function getGCMethods(ctx: StoreContext) { } /** - * Schedule a vat for reaping. + * Schedule an endpoint for reaping. * - * @param vatId - The vat to schedule for reaping. + * @param endpointId - The endpoint (vat or remote) to schedule for reaping. */ - function scheduleReap(vatId: VatId): void { + function scheduleReap(endpointId: EndpointId): void { const queue = JSON.parse(ctx.reapQueue.get() ?? '[]'); - if (!queue.includes(vatId)) { - queue.push(vatId); + if (!queue.includes(endpointId)) { + queue.push(endpointId); ctx.reapQueue.set(JSON.stringify(queue)); } } From dba940ff121c339f38316d10b9aa93cb8dfb138c Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 9 Feb 2026 15:58:34 +0100 Subject: [PATCH 2/3] fix(ocap-kernel): move remoteGcRequested flag after savepoint commit Move the #remoteGcRequested in-memory state change after the savepoint commit, consistent with the transactional message processing pattern from #811. Also consolidate test assertions to use toStrictEqual on parsed objects per project convention, and fix return value expectation. Co-Authored-By: Claude Opus 4.6 --- .../src/remotes/kernel/RemoteHandle.test.ts | 29 ++++++++++++------- .../src/remotes/kernel/RemoteHandle.ts | 9 ++++-- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index b2aaa0e97..8dfb4e9a1 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -165,9 +165,11 @@ describe('RemoteHandle', () => { const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock .calls[0]![1]; const parsed = JSON.parse(sentString); - expect(parsed.seq).toBe(1); - expect(parsed.method).toBe('deliver'); - expect(parsed.params).toStrictEqual(['bringOutYourDead']); + expect(parsed).toStrictEqual({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }); expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); }); @@ -181,7 +183,7 @@ describe('RemoteHandle', () => { }); const reply = await remote.handleRemoteMessage(delivery); - expect(reply).toBe(''); + expect(reply).toBeNull(); // Verify reap was scheduled by checking the reap queue expect(mockKernelStore.nextReapAction()).toStrictEqual({ type: 'bringOutYourDead', @@ -232,9 +234,12 @@ describe('RemoteHandle', () => { const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock .calls[0]![1]; const parsed = JSON.parse(sentString); - expect(parsed.seq).toBe(1); - expect(parsed.method).toBe('deliver'); - expect(parsed.params).toStrictEqual(['bringOutYourDead']); + expect(parsed).toStrictEqual({ + seq: 1, + ack: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }); }); it('tracks correct seq/ack on BOYD messages', async () => { @@ -264,10 +269,12 @@ describe('RemoteHandle', () => { const { calls } = vi.mocked(mockRemoteComms.sendRemoteMessage).mock; // Second call is the BOYD (first was the notify) const parsed = JSON.parse(calls[1]![1]); - expect(parsed.seq).toBe(2); - expect(parsed.ack).toBe(3); - expect(parsed.method).toBe('deliver'); - expect(parsed.params).toStrictEqual(['bringOutYourDead']); + expect(parsed).toStrictEqual({ + seq: 2, + ack: 3, + method: 'deliver', + params: ['bringOutYourDead'], + }); }); it('persists BOYD message for retransmission', async () => { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index cc3b32f38..7b0dfec1b 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -772,9 +772,6 @@ export class RemoteHandle implements EndpointHandle { break; } case 'bringOutYourDead': { - // TODO: After merging #811, move this assignment after the savepoint - // commit, consistent with the transactional message processing pattern. - this.#remoteGcRequested = true; this.#kernelStore.scheduleReap(this.remoteId); break; } @@ -965,6 +962,12 @@ export class RemoteHandle implements EndpointHandle { // on outgoing messages doesn't acknowledge uncommitted message receipts. this.#highestReceivedSeq = seq; + // Set ping-pong prevention flag after commit so it's only visible once + // the BOYD delivery is durably recorded. + if (method === 'deliver' && params[0] === 'bringOutYourDead') { + this.#remoteGcRequested = true; + } + // Complete deferred operations if (deferredCompletion) { switch (method) { From 2e895a7ea49f4d04527de8f2f6133fb091326076 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 9 Feb 2026 18:51:27 +0100 Subject: [PATCH 3/3] fix(ocap-kernel): fix DGC e2e tests using wrong remote ID and isolated store cache The DGC e2e tests had two compounding bugs: they called scheduleReap('r0') but the first allocated remote is r1, and they used a separate KernelStore instance whose cache was isolated from the kernel's internal store. Add reapRemotes() to RemoteManager and Kernel (mirroring reapVats) and use it in the e2e tests instead of bypassing the kernel with an external store. Remove getGCActions() and nextRemoteId assertions that suffered from the same stale cache issue. Also reset #remoteGcRequested flag in handlePeerRestart() so BOYD is correctly sent to a new incarnation after peer restart. Co-Authored-By: Claude Opus 4.6 --- packages/nodejs/test/e2e/remote-comms.test.ts | 35 ++++++-------- packages/ocap-kernel/src/Kernel.ts | 11 +++++ .../src/remotes/kernel/RemoteHandle.test.ts | 23 +++++++++ .../src/remotes/kernel/RemoteHandle.ts | 3 +- .../src/remotes/kernel/RemoteManager.test.ts | 47 +++++++++++++++++++ .../src/remotes/kernel/RemoteManager.ts | 14 ++++++ 6 files changed, 111 insertions(+), 22 deletions(-) diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 16bbcd09d..66b910a02 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -1036,19 +1036,16 @@ describe.sequential('Remote Communications E2E', () => { ); // Send a message to create cross-kernel object references - await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); - - // Verify remote endpoint was allocated on kernel1 - const nextRemoteId1 = Number( - kernelStore1.kv.get('nextRemoteId') ?? '0', + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], ); - expect(nextRemoteId1).toBeGreaterThan(0); - // Verify remote endpoint was allocated on kernel2 - const nextRemoteId2 = Number( - kernelStore2.kv.get('nextRemoteId') ?? '0', - ); - expect(nextRemoteId2).toBeGreaterThan(0); + // Verify cross-kernel communication works (implies remote endpoints were created) + expect(response).toContain('vat Bob got "hello" from Alice'); }, NETWORK_TIMEOUT, ); @@ -1067,9 +1064,9 @@ describe.sequential('Remote Communications E2E', () => { // Send a message to create cross-kernel refs await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); - // Schedule reap on kernel1's remote endpoint (r0) - this will cause + // Schedule reap on kernel1's remote endpoints - this will cause // the crank loop to deliver BOYD to the remote kernel - kernelStore1.scheduleReap('r0'); + kernel1.reapRemotes(); // Trigger cranks to process the reap action (which sends BOYD to kernel2) // and allow the remote to process it and respond @@ -1106,8 +1103,8 @@ describe.sequential('Remote Communications E2E', () => { await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']); - // Schedule reap on kernel2's remote endpoint - this will send BOYD to kernel1 - kernelStore2.scheduleReap('r0'); + // Schedule reap on kernel2's remote endpoints - this will send BOYD to kernel1 + kernel2.reapRemotes(); // Trigger cranks to process the reap and allow BOYD to flow for (let i = 0; i < 3; i++) { @@ -1154,8 +1151,8 @@ describe.sequential('Remote Communications E2E', () => { // Schedule reap on BOTH sides simultaneously - this tests that the // ping-pong prevention flag works correctly, preventing infinite BOYD loops - kernelStore1.scheduleReap('r0'); - kernelStore2.scheduleReap('r0'); + kernel1.reapRemotes(); + kernel2.reapRemotes(); // Trigger cranks on both kernels to process the reaps and allow // BOYD messages to flow in both directions @@ -1167,10 +1164,6 @@ describe.sequential('Remote Communications E2E', () => { await waitUntilQuiescent(500); } - // No pending GC actions on either side after DGC completes - expect(kernelStore1.getGCActions().size).toBe(0); - expect(kernelStore2.getGCActions().size).toBe(0); - // Verify continued bidirectional communication works - this proves // the BOYD exchange completed without breaking the connection const aliceToBob = await sendRemoteMessage( diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index c9157c8c4..37a272ad7 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -14,6 +14,7 @@ import { makeKernelStore } from './store/index.ts'; import type { KernelStore } from './store/index.ts'; import type { VatId, + RemoteId, EndpointId, KRef, PlatformServices, @@ -481,6 +482,16 @@ export class Kernel { this.#vatManager.reapVats(filter); } + /** + * Reap remotes that match the filter. + * This is for debugging and testing purposes only. + * + * @param filter - A function that returns true if the remote should be reaped. + */ + reapRemotes(filter: (remoteId: RemoteId) => boolean = () => true): void { + this.#remoteManager.reapRemotes(filter); + } + /** * Pin a vat root. * diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 8dfb4e9a1..2b5e52e23 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -1460,5 +1460,28 @@ describe('RemoteHandle', () => { // The pending redemption should be rejected await expect(redeemPromise).rejects.toThrow('Remote peer restarted'); }); + + it('resets remoteGcRequested flag so BOYD is sent to new incarnation', async () => { + const remote = makeRemote(); + + // Receive BOYD from remote — sets the ping-pong prevention flag + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }), + ); + + // Peer restarts — flag should be cleared + remote.handlePeerRestart(); + + // Next deliverBringOutYourDead should send BOYD (not suppress it) + await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledWith( + mockRemotePeerId, + expect.any(String), + ); + }); }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 7b0dfec1b..7bfa0e510 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -1089,11 +1089,12 @@ export class RemoteHandle implements EndpointHandle { // Reject pending URL redemptions - the remote won't have context for them this.rejectPendingRedemptions('Remote peer restarted'); - // Reset sequence numbers for fresh start + // Reset sequence numbers and flags for fresh start this.#nextSendSeq = 0; this.#highestReceivedSeq = 0; this.#startSeq = 0; this.#retryCount = 0; + this.#remoteGcRequested = false; // Clear persisted sequence state this.#kernelStore.clearRemoteSeqState(this.remoteId); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index beb3df9cc..b13dab698 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -520,6 +520,53 @@ describe('RemoteManager', () => { }); }); + describe('reapRemotes', () => { + beforeEach(async () => { + const messageHandler = vi.fn(); + vi.mocked(remoteComms.initRemoteComms).mockResolvedValue(mockRemoteComms); + remoteManager.setMessageHandler(messageHandler); + await remoteManager.initRemoteComms(); + }); + + it('schedules reap for all remotes when no filter provided', () => { + remoteManager.establishRemote('peer1'); + remoteManager.establishRemote('peer2'); + remoteManager.establishRemote('peer3'); + + remoteManager.reapRemotes(); + + // Drain the reap queue and verify all remotes were scheduled + const reaped: string[] = []; + let action = kernelStore.nextReapAction(); + while (action) { + reaped.push(action.endpointId); + action = kernelStore.nextReapAction(); + } + expect(reaped).toStrictEqual(['r1', 'r2', 'r3']); + }); + + it('schedules reap only for remotes matching the filter', () => { + remoteManager.establishRemote('peer1'); + remoteManager.establishRemote('peer2'); + remoteManager.establishRemote('peer3'); + + remoteManager.reapRemotes((remoteId) => remoteId === 'r2'); + + const action = kernelStore.nextReapAction(); + expect(action).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r2', + }); + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + + it('does nothing when no remotes exist', () => { + remoteManager.reapRemotes(); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + }); + describe('handleRemoteGiveUp', () => { beforeEach(async () => { const messageHandler = vi.fn(); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index 01b6201a4..600a371f0 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -351,6 +351,20 @@ export class RemoteManager { await this.#platformServices.closeConnection(peerId); } + /** + * Schedule reap for remotes that match the filter. + * This is for debugging and testing purposes only. + * + * @param filter - A function that returns true if the remote should be reaped. + */ + reapRemotes(filter: (remoteId: RemoteId) => boolean = () => true): void { + for (const remoteId of this.#remotes.keys()) { + if (filter(remoteId)) { + this.#kernelStore.scheduleReap(remoteId); + } + } + } + /** * Take note of where a peer might be. *