diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 104a05e99..66b910a02 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -1,5 +1,6 @@ import type { Libp2p } from '@libp2p/interface'; import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { waitUntilQuiescent } from '@metamask/kernel-utils'; import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; import type { KRef } from '@metamask/ocap-kernel'; import { startRelay } from '@ocap/cli/relay'; @@ -1021,4 +1022,169 @@ describe.sequential('Remote Communications E2E', () => { NETWORK_TIMEOUT * 3, ); }); + + describe('Distributed Garbage Collection', () => { + it( + 'creates remote endpoint with clist entries after cross-kernel message', + async () => { + const { aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send a message to create cross-kernel object references + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + + // Verify cross-kernel communication works (implies remote endpoints were created) + expect(response).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'sends BOYD to remote kernel when local remote is reaped', + async () => { + const { aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send a message to create cross-kernel refs + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + + // Schedule reap on kernel1's remote endpoints - this will cause + // the crank loop to deliver BOYD to the remote kernel + kernel1.reapRemotes(); + + // Trigger cranks to process the reap action (which sends BOYD to kernel2) + // and allow the remote to process it and respond + for (let i = 0; i < 3; i++) { + await kernel1.queueMessage(aliceRef, 'ping', []); + await waitUntilQuiescent(500); + } + + // Verify communication still works after DGC + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(response).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'processes incoming BOYD by scheduling local reap', + async () => { + const { bobRef, aliceURL, aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send messages in both directions to create refs on both sides + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']); + + // Schedule reap on kernel2's remote endpoints - this will send BOYD to kernel1 + kernel2.reapRemotes(); + + // Trigger cranks to process the reap and allow BOYD to flow + for (let i = 0; i < 3; i++) { + await kernel2.queueMessage(bobRef, 'ping', []); + await waitUntilQuiescent(500); + } + + // Verify communication still works after DGC from both directions + const aliceToBob = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(aliceToBob).toContain('vat Bob got "hello" from Alice'); + + const bobToAlice = await sendRemoteMessage( + kernel2, + bobRef, + aliceURL, + 'hello', + ['Bob'], + ); + expect(bobToAlice).toContain('vat Alice got "hello" from Bob'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'completes BOYD exchange without infinite ping-pong', + async () => { + const { aliceRef, bobRef, bobURL, aliceURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + ); + + // Send messages to establish refs on both sides + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']); + + // Schedule reap on BOTH sides simultaneously - this tests that the + // ping-pong prevention flag works correctly, preventing infinite BOYD loops + kernel1.reapRemotes(); + kernel2.reapRemotes(); + + // Trigger cranks on both kernels to process the reaps and allow + // BOYD messages to flow in both directions + for (let i = 0; i < 3; i++) { + await Promise.all([ + kernel1.queueMessage(aliceRef, 'ping', []), + kernel2.queueMessage(bobRef, 'ping', []), + ]); + await waitUntilQuiescent(500); + } + + // Verify continued bidirectional communication works - this proves + // the BOYD exchange completed without breaking the connection + const aliceToBob = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(aliceToBob).toContain('vat Bob got "hello" from Alice'); + + const bobToAlice = await sendRemoteMessage( + kernel2, + bobRef, + aliceURL, + 'hello', + ['Bob'], + ); + expect(bobToAlice).toContain('vat Alice got "hello" from Bob'); + }, + NETWORK_TIMEOUT, + ); + }); }); diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index c9157c8c4..37a272ad7 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -14,6 +14,7 @@ import { makeKernelStore } from './store/index.ts'; import type { KernelStore } from './store/index.ts'; import type { VatId, + RemoteId, EndpointId, KRef, PlatformServices, @@ -481,6 +482,16 @@ export class Kernel { this.#vatManager.reapVats(filter); } + /** + * Reap remotes that match the filter. + * This is for debugging and testing purposes only. + * + * @param filter - A function that returns true if the remote should be reaped. + */ + reapRemotes(filter: (remoteId: RemoteId) => boolean = () => true): void { + this.#remoteManager.reapRemotes(filter); + } + /** * Pin a vat root. * diff --git a/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts b/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts index 8b5b9aa9b..86f769934 100644 --- a/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts +++ b/packages/ocap-kernel/src/garbage-collection/garbage-collection.ts @@ -3,14 +3,14 @@ import { insistKernelType } from '../store/utils/kernel-slots.ts'; import type { GCAction, GCActionType, + EndpointId, KRef, RunQueueItem, - VatId, } from '../types.ts'; import { actionTypePriorities, insistGCActionType, - insistVatId, + insistEndpointId, queueTypeFromActionType, } from '../types.ts'; import { assert } from '../utils/assert.ts'; @@ -19,43 +19,43 @@ import { assert } from '../utils/assert.ts'; * Parsed representation of a GC action. */ type ParsedGCAction = Readonly<{ - vatId: VatId; + endpointId: EndpointId; type: GCActionType; kref: KRef; }>; /** - * Parse a GC action string into a vat id, type, and kref. + * Parse a GC action string into an endpoint id, type, and kref. * * @param action - The GC action string to parse. * @returns The parsed GC action. */ function parseAction(action: GCAction): ParsedGCAction { - const [vatId, type, kref] = action.split(' '); - insistVatId(vatId); + const [endpointId, type, kref] = action.split(' '); + insistEndpointId(endpointId); insistGCActionType(type); insistKernelType('object', kref); - return harden({ vatId, type, kref }); + return harden({ endpointId, type, kref }); } /** * Determines if a GC action should be processed based on current system state. * * @param storage - The kernel storage. - * @param vatId - The vat id of the vat that owns the kref. + * @param endpointId - The endpoint id of the vat or remote that owns the kref. * @param type - The type of GC action. * @param kref - The kref of the object in question. * @returns True if the action should be processed, false otherwise. */ function shouldProcessAction( storage: KernelStore, - vatId: VatId, + endpointId: EndpointId, type: GCActionType, kref: KRef, ): boolean { - const hasCList = storage.hasCListEntry(vatId, kref); + const hasCList = storage.hasCListEntry(endpointId, kref); const isReachable = hasCList - ? storage.getReachableFlag(vatId, kref) + ? storage.getReachableFlag(endpointId, kref) : undefined; const exists = storage.kernelRefExists(kref); const { reachable, recognizable } = exists @@ -78,17 +78,17 @@ function shouldProcessAction( } /** - * Filters and processes a group of GC actions for a specific vat and action type. + * Filters and processes a group of GC actions for a specific endpoint and action type. * * @param storage - The kernel storage. - * @param vatId - The vat id of the vat that owns the krefs. + * @param endpointId - The endpoint id of the vat or remote that owns the krefs. * @param actions - The set of GC actions to process. * @param allActionsSet - The complete set of GC actions. * @returns Object containing the krefs to process and whether the action set was updated. */ function filterActionsForProcessing( storage: KernelStore, - vatId: VatId, + endpointId: EndpointId, actions: Set, allActionsSet: Set, ): { krefs: KRef[]; actionSetUpdated: boolean } { @@ -97,7 +97,7 @@ function filterActionsForProcessing( for (const action of actions) { const { type, kref } = parseAction(action); - if (shouldProcessAction(storage, vatId, type, kref)) { + if (shouldProcessAction(storage, endpointId, type, kref)) { krefs.push(kref); } allActionsSet.delete(action); @@ -119,43 +119,52 @@ export function processGCActionSet( const allActionsSet = storage.getGCActions(); let actionSetUpdated = false; - // Group actions by vat and type - const actionsByVat = new Map>>(); + // Group actions by endpoint and type + const actionsByEndpoint = new Map< + EndpointId, + Map> + >(); for (const action of allActionsSet) { - const { vatId, type } = parseAction(action); + const { endpointId, type } = parseAction(action); - if (!actionsByVat.has(vatId)) { - actionsByVat.set(vatId, new Map()); + if (!actionsByEndpoint.has(endpointId)) { + actionsByEndpoint.set(endpointId, new Map()); } - const actionsForVatByType = actionsByVat.get(vatId); - assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`); + const actionsForEndpointByType = actionsByEndpoint.get(endpointId); + assert( + actionsForEndpointByType !== undefined, + `No actions for endpoint: ${endpointId}`, + ); - if (!actionsForVatByType.has(type)) { - actionsForVatByType.set(type, new Set()); + if (!actionsForEndpointByType.has(type)) { + actionsForEndpointByType.set(type, new Set()); } - const actions = actionsForVatByType.get(type); + const actions = actionsForEndpointByType.get(type); assert(actions !== undefined, `No actions for type: ${type}`); actions.add(action); } // Process actions in priority order - const vatIds = Array.from(actionsByVat.keys()).sort(); + const endpointIds = Array.from(actionsByEndpoint.keys()).sort(); - for (const vatId of vatIds) { - const actionsForVatByType = actionsByVat.get(vatId); - assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`); + for (const endpointId of endpointIds) { + const actionsForEndpointByType = actionsByEndpoint.get(endpointId); + assert( + actionsForEndpointByType !== undefined, + `No actions for endpoint: ${endpointId}`, + ); - // Find the highest-priority type of work to do within this vat + // Find the highest-priority type of work to do within this endpoint for (const type of actionTypePriorities) { - if (actionsForVatByType.has(type)) { - const actions = actionsForVatByType.get(type); + if (actionsForEndpointByType.has(type)) { + const actions = actionsForEndpointByType.get(type); assert(actions !== undefined, `No actions for type: ${type}`); const { krefs, actionSetUpdated: updated } = filterActionsForProcessing( storage, - vatId, + endpointId, actions, allActionsSet, ); @@ -172,7 +181,7 @@ export function processGCActionSet( const queueType = queueTypeFromActionType.get(type); assert(queueType !== undefined, `Unknown action type: ${type}`); - return harden({ type: queueType, endpointId: vatId, krefs }); + return harden({ type: queueType, endpointId, krefs }); } } } diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 9e91647fb..2b5e52e23 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -153,12 +153,144 @@ describe('RemoteHandle', () => { expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); }); - it('deliverBringOutYourDead does not call sendRemoteMessage', async () => { - const remote = makeRemote(); + describe('bringOutYourDead', () => { + it('sends BOYD delivery to remote when locally triggered', async () => { + const remote = makeRemote(); - const crankResult = await remote.deliverBringOutYourDead(); - expect(mockRemoteComms.sendRemoteMessage).not.toHaveBeenCalled(); - expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); + const crankResult = await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledWith( + mockRemotePeerId, + expect.any(String), + ); + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]![1]; + const parsed = JSON.parse(sentString); + expect(parsed).toStrictEqual({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }); + expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); + }); + + it('handles incoming BOYD by scheduling reap', async () => { + const remote = makeRemote(); + + const delivery = JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }); + const reply = await remote.handleRemoteMessage(delivery); + + expect(reply).toBeNull(); + // Verify reap was scheduled by checking the reap queue + expect(mockKernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: remote.remoteId, + }); + }); + + it('does not send BOYD back when remotely triggered (ping-pong prevention)', async () => { + const remote = makeRemote(); + + // Receive BOYD from remote + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }), + ); + + // Now local kernel calls deliverBringOutYourDead - should NOT send back + const crankResult = await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).not.toHaveBeenCalled(); + expect(crankResult).toStrictEqual({ didDelivery: remote.remoteId }); + }); + + it('clears flag after skipping echo (next local BOYD sends normally)', async () => { + const remote = makeRemote(); + + // Receive BOYD from remote + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }), + ); + + // First local BOYD - suppressed + await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).not.toHaveBeenCalled(); + + // Second local BOYD - should send normally (flag was cleared) + await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledWith( + mockRemotePeerId, + expect.any(String), + ); + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]![1]; + const parsed = JSON.parse(sentString); + expect(parsed).toStrictEqual({ + seq: 1, + ack: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }); + }); + + it('tracks correct seq/ack on BOYD messages', async () => { + const remote = makeRemote(); + + // Receive a non-BOYD message to set up ack tracking + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 3, + method: 'deliver', + params: ['notify', resolutions], + }), + ); + + // Send a non-BOYD message first to consume seq 1 + await remote.deliverNotify([ + ['rp+1', false, { body: '"value"', slots: [] }], + ]); + + // Now send BOYD - should get seq 2 with ack 3 + await remote.deliverBringOutYourDead(); + + const { calls } = vi.mocked(mockRemoteComms.sendRemoteMessage).mock; + // Second call is the BOYD (first was the notify) + const parsed = JSON.parse(calls[1]![1]); + expect(parsed).toStrictEqual({ + seq: 2, + ack: 3, + method: 'deliver', + params: ['bringOutYourDead'], + }); + }); + + it('persists BOYD message for retransmission', async () => { + const remote = makeRemote(); + + await remote.deliverBringOutYourDead(); + + // Verify message was persisted + const pendingMsgString = mockKernelStore.getPendingMessage( + mockRemoteId, + 1, + ); + expect(pendingMsgString).toBeDefined(); + expect(pendingMsgString).toContain('"bringOutYourDead"'); + expect(pendingMsgString).toContain('"seq":1'); + }); }); it('redeemOcapURL calls sendRemoteMessage correctly and handles expected reply (success)', async () => { @@ -1328,5 +1460,28 @@ describe('RemoteHandle', () => { // The pending redemption should be rejected await expect(redeemPromise).rejects.toThrow('Remote peer restarted'); }); + + it('resets remoteGcRequested flag so BOYD is sent to new incarnation', async () => { + const remote = makeRemote(); + + // Receive BOYD from remote — sets the ping-pong prevention flag + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 1, + method: 'deliver', + params: ['bringOutYourDead'], + }), + ); + + // Peer restarts — flag should be cleared + remote.handlePeerRestart(); + + // Next deliverBringOutYourDead should send BOYD (not suppress it) + await remote.deliverBringOutYourDead(); + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledWith( + mockRemotePeerId, + expect.any(String), + ); + }); }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index b7d3dfa98..7bfa0e510 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -47,13 +47,15 @@ type NotifyDelivery = ['notify', VatOneResolution[]]; type DropExportsDelivery = ['dropExports', string[]]; type RetireExportsDelivery = ['retireExports', string[]]; type RetireImportsDelivery = ['retireImports', string[]]; +type BringOutYourDeadDelivery = ['bringOutYourDead']; type DeliveryParams = | MessageDelivery | NotifyDelivery | DropExportsDelivery | RetireExportsDelivery - | RetireImportsDelivery; + | RetireImportsDelivery + | BringOutYourDeadDelivery; type Delivery = { method: 'deliver'; @@ -102,6 +104,13 @@ export class RemoteHandle implements EndpointHandle { /** Flag that location hints need to be sent to remote comms object. */ #needsHinting: boolean = true; + /** + * Flag indicating the current BOYD was triggered by an incoming remote + * request. When set, deliverBringOutYourDead will skip sending BOYD back + * to the remote, preventing infinite ping-pong. + */ + #remoteGcRequested: boolean = false; + /** Pending URL redemption requests that have not yet been responded to. */ readonly #pendingRedemptions: Map< string, @@ -635,15 +644,21 @@ export class RemoteHandle implements EndpointHandle { } /** - * Make a 'bringOutYourDead' delivery to the remote. - * - * Currently this does not actually do anything but is included to satisfy the - * EndpointHandle interface. + * Send a 'bringOutYourDead' delivery to the remote, requesting it to run + * its garbage collection cycle. If the current BOYD was triggered by an + * incoming remote request, skip sending to prevent infinite ping-pong. * * @returns the crank results. */ async deliverBringOutYourDead(): Promise { - // XXX Currently a no-op, but probably some further DGC action is warranted here + if (this.#remoteGcRequested) { + this.#remoteGcRequested = false; + return this.#myCrankResult; + } + await this.#sendRemoteCommand({ + method: 'deliver', + params: ['bringOutYourDead'], + }); return this.#myCrankResult; } @@ -756,6 +771,10 @@ export class RemoteHandle implements EndpointHandle { this.#retireImports(erefs); break; } + case 'bringOutYourDead': { + this.#kernelStore.scheduleReap(this.remoteId); + break; + } default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions throw Error(`unknown remote delivery method ${method}`); @@ -943,6 +962,12 @@ export class RemoteHandle implements EndpointHandle { // on outgoing messages doesn't acknowledge uncommitted message receipts. this.#highestReceivedSeq = seq; + // Set ping-pong prevention flag after commit so it's only visible once + // the BOYD delivery is durably recorded. + if (method === 'deliver' && params[0] === 'bringOutYourDead') { + this.#remoteGcRequested = true; + } + // Complete deferred operations if (deferredCompletion) { switch (method) { @@ -1064,11 +1089,12 @@ export class RemoteHandle implements EndpointHandle { // Reject pending URL redemptions - the remote won't have context for them this.rejectPendingRedemptions('Remote peer restarted'); - // Reset sequence numbers for fresh start + // Reset sequence numbers and flags for fresh start this.#nextSendSeq = 0; this.#highestReceivedSeq = 0; this.#startSeq = 0; this.#retryCount = 0; + this.#remoteGcRequested = false; // Clear persisted sequence state this.#kernelStore.clearRemoteSeqState(this.remoteId); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index beb3df9cc..b13dab698 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -520,6 +520,53 @@ describe('RemoteManager', () => { }); }); + describe('reapRemotes', () => { + beforeEach(async () => { + const messageHandler = vi.fn(); + vi.mocked(remoteComms.initRemoteComms).mockResolvedValue(mockRemoteComms); + remoteManager.setMessageHandler(messageHandler); + await remoteManager.initRemoteComms(); + }); + + it('schedules reap for all remotes when no filter provided', () => { + remoteManager.establishRemote('peer1'); + remoteManager.establishRemote('peer2'); + remoteManager.establishRemote('peer3'); + + remoteManager.reapRemotes(); + + // Drain the reap queue and verify all remotes were scheduled + const reaped: string[] = []; + let action = kernelStore.nextReapAction(); + while (action) { + reaped.push(action.endpointId); + action = kernelStore.nextReapAction(); + } + expect(reaped).toStrictEqual(['r1', 'r2', 'r3']); + }); + + it('schedules reap only for remotes matching the filter', () => { + remoteManager.establishRemote('peer1'); + remoteManager.establishRemote('peer2'); + remoteManager.establishRemote('peer3'); + + remoteManager.reapRemotes((remoteId) => remoteId === 'r2'); + + const action = kernelStore.nextReapAction(); + expect(action).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r2', + }); + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + + it('does nothing when no remotes exist', () => { + remoteManager.reapRemotes(); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + }); + describe('handleRemoteGiveUp', () => { beforeEach(async () => { const messageHandler = vi.fn(); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index 01b6201a4..600a371f0 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -351,6 +351,20 @@ export class RemoteManager { await this.#platformServices.closeConnection(peerId); } + /** + * Schedule reap for remotes that match the filter. + * This is for debugging and testing purposes only. + * + * @param filter - A function that returns true if the remote should be reaped. + */ + reapRemotes(filter: (remoteId: RemoteId) => boolean = () => true): void { + for (const remoteId of this.#remotes.keys()) { + if (filter(remoteId)) { + this.#kernelStore.scheduleReap(remoteId); + } + } + } + /** * Take note of where a peer might be. * diff --git a/packages/ocap-kernel/src/store/methods/gc.test.ts b/packages/ocap-kernel/src/store/methods/gc.test.ts index b8a12f196..ad4586340 100644 --- a/packages/ocap-kernel/src/store/methods/gc.test.ts +++ b/packages/ocap-kernel/src/store/methods/gc.test.ts @@ -30,13 +30,29 @@ describe('GC methods', () => { expect(actions).toStrictEqual(new Set(validActions)); }); + it('manages GC actions for remote endpoints', () => { + const r0Object = kernelStore.initKernelObject('r0'); + const r1Object = kernelStore.initKernelObject('r1'); + + const remoteActions: GCAction[] = [ + `r0 dropExport ${r0Object}`, + `r1 retireExport ${r1Object}`, + ]; + + kernelStore.addGCActions(remoteActions); + + const actions = kernelStore.getGCActions(); + expect(actions.size).toBe(2); + expect(actions).toStrictEqual(new Set(remoteActions)); + }); + it('rejects invalid GC actions', () => { const v1Object = kernelStore.initKernelObject('v1'); - // Invalid vat ID + // Invalid endpoint ID expect(() => { kernelStore.addGCActions([`x1 dropExport ${v1Object}`]); - }).toThrow('not a valid VatId'); + }).toThrow('not a valid EndpointId'); // Invalid action type expect(() => { @@ -131,6 +147,58 @@ describe('GC methods', () => { expect(kernelStore.nextReapAction()).toBeUndefined(); }); + + it('schedules remote IDs for reaping', () => { + kernelStore.scheduleReap('r0'); + kernelStore.scheduleReap('r1'); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r0', + }); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r1', + }); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + + it('interleaves vat and remote reap scheduling', () => { + kernelStore.scheduleReap('v1'); + kernelStore.scheduleReap('r0'); + kernelStore.scheduleReap('v2'); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'v1', + }); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r0', + }); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'v2', + }); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); + + it('handles duplicate remote reap scheduling', () => { + kernelStore.scheduleReap('r0'); + kernelStore.scheduleReap('r0'); + + expect(kernelStore.nextReapAction()).toStrictEqual({ + type: 'bringOutYourDead', + endpointId: 'r0', + }); + + expect(kernelStore.nextReapAction()).toBeUndefined(); + }); }); describe('retireKernelObjects', () => { diff --git a/packages/ocap-kernel/src/store/methods/gc.ts b/packages/ocap-kernel/src/store/methods/gc.ts index 6d518bc1a..e95a2cb67 100644 --- a/packages/ocap-kernel/src/store/methods/gc.ts +++ b/packages/ocap-kernel/src/store/methods/gc.ts @@ -9,11 +9,12 @@ import { getSubclusterMethods } from './subclusters.ts'; import { getVatMethods } from './vat.ts'; import type { VatId, + EndpointId, KRef, GCAction, RunQueueItemBringOutYourDead, } from '../../types.ts'; -import { insistGCActionType, insistVatId } from '../../types.ts'; +import { insistGCActionType, insistEndpointId } from '../../types.ts'; import type { StoreContext } from '../types.ts'; import { insistKernelType, parseKernelSlot } from '../utils/kernel-slots.ts'; @@ -62,8 +63,8 @@ export function getGCMethods(ctx: StoreContext) { const actions = getGCActions(); for (const action of newActions) { assert.typeof(action, 'string', 'addGCActions given bad action'); - const [vatId, type, kref] = action.split(' '); - insistVatId(vatId); + const [endpointId, type, kref] = action.split(' '); + insistEndpointId(endpointId); insistGCActionType(type); insistKernelType('object', kref); actions.add(action); @@ -72,14 +73,14 @@ export function getGCMethods(ctx: StoreContext) { } /** - * Schedule a vat for reaping. + * Schedule an endpoint for reaping. * - * @param vatId - The vat to schedule for reaping. + * @param endpointId - The endpoint (vat or remote) to schedule for reaping. */ - function scheduleReap(vatId: VatId): void { + function scheduleReap(endpointId: EndpointId): void { const queue = JSON.parse(ctx.reapQueue.get() ?? '[]'); - if (!queue.includes(vatId)) { - queue.push(vatId); + if (!queue.includes(endpointId)) { + queue.push(endpointId); ctx.reapQueue.set(JSON.stringify(queue)); } }