Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions packages/nodejs/test/e2e/remote-comms.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Libp2p } from '@libp2p/interface';
import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs';
import { waitUntilQuiescent } from '@metamask/kernel-utils';
import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel';
import type { KRef } from '@metamask/ocap-kernel';
import { startRelay } from '@ocap/cli/relay';
Expand Down Expand Up @@ -1021,4 +1022,169 @@ describe.sequential('Remote Communications E2E', () => {
NETWORK_TIMEOUT * 3,
);
});

describe('Distributed Garbage Collection', () => {
it(
'creates remote endpoint with clist entries after cross-kernel message',
async () => {
const { aliceRef, bobURL } = await setupAliceAndBob(
kernel1,
kernel2,
kernelStore1,
kernelStore2,
testRelays,
);

// Send a message to create cross-kernel object references
const response = await sendRemoteMessage(
kernel1,
aliceRef,
bobURL,
'hello',
['Alice'],
);

// Verify cross-kernel communication works (implies remote endpoints were created)
expect(response).toContain('vat Bob got "hello" from Alice');
},
NETWORK_TIMEOUT,
);

it(
'sends BOYD to remote kernel when local remote is reaped',
async () => {
const { aliceRef, bobURL } = await setupAliceAndBob(
kernel1,
kernel2,
kernelStore1,
kernelStore2,
testRelays,
);

// Send a message to create cross-kernel refs
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);

// Schedule reap on kernel1's remote endpoints - this will cause
// the crank loop to deliver BOYD to the remote kernel
kernel1.reapRemotes();

// Trigger cranks to process the reap action (which sends BOYD to kernel2)
// and allow the remote to process it and respond
for (let i = 0; i < 3; i++) {
await kernel1.queueMessage(aliceRef, 'ping', []);
await waitUntilQuiescent(500);
}

// Verify communication still works after DGC
const response = await sendRemoteMessage(
kernel1,
aliceRef,
bobURL,
'hello',
['Alice'],
);
expect(response).toContain('vat Bob got "hello" from Alice');
},
NETWORK_TIMEOUT,
);

it(
'processes incoming BOYD by scheduling local reap',
async () => {
const { bobRef, aliceURL, aliceRef, bobURL } = await setupAliceAndBob(
kernel1,
kernel2,
kernelStore1,
kernelStore2,
testRelays,
);

// Send messages in both directions to create refs on both sides
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']);

// Schedule reap on kernel2's remote endpoints - this will send BOYD to kernel1
kernel2.reapRemotes();

// Trigger cranks to process the reap and allow BOYD to flow
for (let i = 0; i < 3; i++) {
await kernel2.queueMessage(bobRef, 'ping', []);
await waitUntilQuiescent(500);
}

// Verify communication still works after DGC from both directions
const aliceToBob = await sendRemoteMessage(
kernel1,
aliceRef,
bobURL,
'hello',
['Alice'],
);
expect(aliceToBob).toContain('vat Bob got "hello" from Alice');

const bobToAlice = await sendRemoteMessage(
kernel2,
bobRef,
aliceURL,
'hello',
['Bob'],
);
expect(bobToAlice).toContain('vat Alice got "hello" from Bob');
},
NETWORK_TIMEOUT,
);

it(
'completes BOYD exchange without infinite ping-pong',
async () => {
const { aliceRef, bobRef, bobURL, aliceURL } = await setupAliceAndBob(
kernel1,
kernel2,
kernelStore1,
kernelStore2,
testRelays,
);

// Send messages to establish refs on both sides
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']);

// Schedule reap on BOTH sides simultaneously - this tests that the
// ping-pong prevention flag works correctly, preventing infinite BOYD loops
kernel1.reapRemotes();
kernel2.reapRemotes();

// Trigger cranks on both kernels to process the reaps and allow
// BOYD messages to flow in both directions
for (let i = 0; i < 3; i++) {
await Promise.all([
kernel1.queueMessage(aliceRef, 'ping', []),
kernel2.queueMessage(bobRef, 'ping', []),
]);
await waitUntilQuiescent(500);
}

// Verify continued bidirectional communication works - this proves
// the BOYD exchange completed without breaking the connection
const aliceToBob = await sendRemoteMessage(
kernel1,
aliceRef,
bobURL,
'hello',
['Alice'],
);
expect(aliceToBob).toContain('vat Bob got "hello" from Alice');

const bobToAlice = await sendRemoteMessage(
kernel2,
bobRef,
aliceURL,
'hello',
['Bob'],
);
expect(bobToAlice).toContain('vat Alice got "hello" from Bob');
},
NETWORK_TIMEOUT,
);
});
});
11 changes: 11 additions & 0 deletions packages/ocap-kernel/src/Kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { makeKernelStore } from './store/index.ts';
import type { KernelStore } from './store/index.ts';
import type {
VatId,
RemoteId,
EndpointId,
KRef,
PlatformServices,
Expand Down Expand Up @@ -481,6 +482,16 @@ export class Kernel {
this.#vatManager.reapVats(filter);
}

/**
* Reap remotes that match the filter.
* This is for debugging and testing purposes only.
*
* @param filter - A function that returns true if the remote should be reaped.
*/
reapRemotes(filter: (remoteId: RemoteId) => boolean = () => true): void {
this.#remoteManager.reapRemotes(filter);
}

/**
* Pin a vat root.
*
Expand Down
77 changes: 43 additions & 34 deletions packages/ocap-kernel/src/garbage-collection/garbage-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import { insistKernelType } from '../store/utils/kernel-slots.ts';
import type {
GCAction,
GCActionType,
EndpointId,
KRef,
RunQueueItem,
VatId,
} from '../types.ts';
import {
actionTypePriorities,
insistGCActionType,
insistVatId,
insistEndpointId,
queueTypeFromActionType,
} from '../types.ts';
import { assert } from '../utils/assert.ts';
Expand All @@ -19,43 +19,43 @@ import { assert } from '../utils/assert.ts';
* Parsed representation of a GC action.
*/
type ParsedGCAction = Readonly<{
vatId: VatId;
endpointId: EndpointId;
type: GCActionType;
kref: KRef;
}>;

/**
* Parse a GC action string into a vat id, type, and kref.
* Parse a GC action string into an endpoint id, type, and kref.
*
* @param action - The GC action string to parse.
* @returns The parsed GC action.
*/
function parseAction(action: GCAction): ParsedGCAction {
const [vatId, type, kref] = action.split(' ');
insistVatId(vatId);
const [endpointId, type, kref] = action.split(' ');
insistEndpointId(endpointId);
insistGCActionType(type);
insistKernelType('object', kref);
return harden({ vatId, type, kref });
return harden({ endpointId, type, kref });
}

/**
* Determines if a GC action should be processed based on current system state.
*
* @param storage - The kernel storage.
* @param vatId - The vat id of the vat that owns the kref.
* @param endpointId - The endpoint id of the vat or remote that owns the kref.
* @param type - The type of GC action.
* @param kref - The kref of the object in question.
* @returns True if the action should be processed, false otherwise.
*/
function shouldProcessAction(
storage: KernelStore,
vatId: VatId,
endpointId: EndpointId,
type: GCActionType,
kref: KRef,
): boolean {
const hasCList = storage.hasCListEntry(vatId, kref);
const hasCList = storage.hasCListEntry(endpointId, kref);
const isReachable = hasCList
? storage.getReachableFlag(vatId, kref)
? storage.getReachableFlag(endpointId, kref)
: undefined;
const exists = storage.kernelRefExists(kref);
const { reachable, recognizable } = exists
Expand All @@ -78,17 +78,17 @@ function shouldProcessAction(
}

/**
* Filters and processes a group of GC actions for a specific vat and action type.
* Filters and processes a group of GC actions for a specific endpoint and action type.
*
* @param storage - The kernel storage.
* @param vatId - The vat id of the vat that owns the krefs.
* @param endpointId - The endpoint id of the vat or remote that owns the krefs.
* @param actions - The set of GC actions to process.
* @param allActionsSet - The complete set of GC actions.
* @returns Object containing the krefs to process and whether the action set was updated.
*/
function filterActionsForProcessing(
storage: KernelStore,
vatId: VatId,
endpointId: EndpointId,
actions: Set<GCAction>,
allActionsSet: Set<GCAction>,
): { krefs: KRef[]; actionSetUpdated: boolean } {
Expand All @@ -97,7 +97,7 @@ function filterActionsForProcessing(

for (const action of actions) {
const { type, kref } = parseAction(action);
if (shouldProcessAction(storage, vatId, type, kref)) {
if (shouldProcessAction(storage, endpointId, type, kref)) {
krefs.push(kref);
}
allActionsSet.delete(action);
Expand All @@ -119,43 +119,52 @@ export function processGCActionSet(
const allActionsSet = storage.getGCActions();
let actionSetUpdated = false;

// Group actions by vat and type
const actionsByVat = new Map<VatId, Map<GCActionType, Set<GCAction>>>();
// Group actions by endpoint and type
const actionsByEndpoint = new Map<
EndpointId,
Map<GCActionType, Set<GCAction>>
>();

for (const action of allActionsSet) {
const { vatId, type } = parseAction(action);
const { endpointId, type } = parseAction(action);

if (!actionsByVat.has(vatId)) {
actionsByVat.set(vatId, new Map());
if (!actionsByEndpoint.has(endpointId)) {
actionsByEndpoint.set(endpointId, new Map());
}

const actionsForVatByType = actionsByVat.get(vatId);
assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`);
const actionsForEndpointByType = actionsByEndpoint.get(endpointId);
assert(
actionsForEndpointByType !== undefined,
`No actions for endpoint: ${endpointId}`,
);

if (!actionsForVatByType.has(type)) {
actionsForVatByType.set(type, new Set());
if (!actionsForEndpointByType.has(type)) {
actionsForEndpointByType.set(type, new Set());
}

const actions = actionsForVatByType.get(type);
const actions = actionsForEndpointByType.get(type);
assert(actions !== undefined, `No actions for type: ${type}`);
actions.add(action);
}

// Process actions in priority order
const vatIds = Array.from(actionsByVat.keys()).sort();
const endpointIds = Array.from(actionsByEndpoint.keys()).sort();

for (const vatId of vatIds) {
const actionsForVatByType = actionsByVat.get(vatId);
assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`);
for (const endpointId of endpointIds) {
const actionsForEndpointByType = actionsByEndpoint.get(endpointId);
assert(
actionsForEndpointByType !== undefined,
`No actions for endpoint: ${endpointId}`,
);

// Find the highest-priority type of work to do within this vat
// Find the highest-priority type of work to do within this endpoint
for (const type of actionTypePriorities) {
if (actionsForVatByType.has(type)) {
const actions = actionsForVatByType.get(type);
if (actionsForEndpointByType.has(type)) {
const actions = actionsForEndpointByType.get(type);
assert(actions !== undefined, `No actions for type: ${type}`);
const { krefs, actionSetUpdated: updated } = filterActionsForProcessing(
storage,
vatId,
endpointId,
actions,
allActionsSet,
);
Expand All @@ -172,7 +181,7 @@ export function processGCActionSet(
const queueType = queueTypeFromActionType.get(type);
assert(queueType !== undefined, `Unknown action type: ${type}`);

return harden({ type: queueType, endpointId: vatId, krefs });
return harden({ type: queueType, endpointId, krefs });
}
}
}
Expand Down
Loading
Loading