Skip to content

Commit 670daf6

Browse files
sirtimidclaude
andauthored
feat(ocap-kernel): implement distributed garbage collection protocol (#814)
## Summary Implements the DGC (Distributed Garbage Collection) protocol for remote kernel communication (Issue #779). - `deliverBringOutYourDead()` on `RemoteHandle` now sends a `['bringOutYourDead']` wire message to the remote kernel instead of being a no-op - The remote kernel schedules a local reap, runs GC, and sends back drops/retires - A ping-pong prevention flag (`#remoteGcRequested`) prevents infinite BOYD loops between kernels ## Changes **GC store widening (`gc.ts`, `garbage-collection.ts`):** - Widen `scheduleReap` to accept `EndpointId` (union of `VatId | RemoteId`) instead of just `VatId` - Update `addGCActions` and `processGCActionSet` to use `insistEndpointId` instead of `insistVatId` - Rename internal variables from `vatId`/`actionsByVat` to `endpointId`/`actionsByEndpoint` for clarity **BOYD protocol (`RemoteHandle.ts`):** - Add `BringOutYourDeadDelivery` to the `DeliveryParams` union type - Implement `deliverBringOutYourDead()` to send BOYD over the wire via `#sendRemoteCommand` - Add `'bringOutYourDead'` case to `#handleRemoteDeliver` that calls `scheduleReap` - Add `#remoteGcRequested` flag: set after the savepoint commit (consistent with the transactional message processing pattern from #811), when BOYD was triggered by an incoming remote request the subsequent local BOYD is suppressed **Tests:** - Unit tests for GC with remote endpoints and reap scheduling - Unit tests for BOYD send/receive, ping-pong prevention, seq/ack tracking, and persistence - E2E tests for distributed GC across two kernels via libp2p ## Test plan - [x] All ocap-kernel unit tests pass (1800+) - [x] All nodejs e2e tests pass (35) - [x] All extension e2e tests pass (19) - [x] CI passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Adds a new cross-kernel `bringOutYourDead` control message and widens GC scheduling from vats to all endpoints, which can affect remote message flow and garbage-collection behavior if mis-triggered, but changes are mostly additive and well-covered by new unit/e2e tests. > > **Overview** > Implements **distributed garbage collection** across kernels by turning `RemoteHandle.deliverBringOutYourDead()` into a real wire-level `deliver` message (`['bringOutYourDead']`) that causes the receiving kernel to `scheduleReap` its remote endpoint and run GC, with a `#remoteGcRequested` flag to prevent infinite BOYD ping-pong and to reset on peer restart. > > Widens GC plumbing from vat-only to **endpoint-wide** operation by treating GC actions and reap scheduling as `EndpointId` (vat or remote) rather than `VatId`, and adds `Kernel.reapRemotes()` / `RemoteManager.reapRemotes()` debugging/test hooks to trigger remote reaping. > > Adds extensive coverage: new RemoteHandle unit tests for BOYD send/receive, seq/ack/persistence, ping-pong prevention, RemoteManager tests for reap scheduling, GC store tests for remote endpoint IDs, and nodejs e2e scenarios validating BOYD exchange and continued bidirectional messaging after DGC. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit ae4b9cc. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 203e063 commit 670daf6

9 files changed

Lines changed: 553 additions & 56 deletions

File tree

packages/nodejs/test/e2e/remote-comms.test.ts

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Libp2p } from '@libp2p/interface';
22
import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs';
3+
import { waitUntilQuiescent } from '@metamask/kernel-utils';
34
import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel';
45
import type { KRef } from '@metamask/ocap-kernel';
56
import { startRelay } from '@ocap/cli/relay';
@@ -1021,4 +1022,169 @@ describe.sequential('Remote Communications E2E', () => {
10211022
NETWORK_TIMEOUT * 3,
10221023
);
10231024
});
1025+
1026+
describe('Distributed Garbage Collection', () => {
1027+
it(
1028+
'creates remote endpoint with clist entries after cross-kernel message',
1029+
async () => {
1030+
const { aliceRef, bobURL } = await setupAliceAndBob(
1031+
kernel1,
1032+
kernel2,
1033+
kernelStore1,
1034+
kernelStore2,
1035+
testRelays,
1036+
);
1037+
1038+
// Send a message to create cross-kernel object references
1039+
const response = await sendRemoteMessage(
1040+
kernel1,
1041+
aliceRef,
1042+
bobURL,
1043+
'hello',
1044+
['Alice'],
1045+
);
1046+
1047+
// Verify cross-kernel communication works (implies remote endpoints were created)
1048+
expect(response).toContain('vat Bob got "hello" from Alice');
1049+
},
1050+
NETWORK_TIMEOUT,
1051+
);
1052+
1053+
it(
1054+
'sends BOYD to remote kernel when local remote is reaped',
1055+
async () => {
1056+
const { aliceRef, bobURL } = await setupAliceAndBob(
1057+
kernel1,
1058+
kernel2,
1059+
kernelStore1,
1060+
kernelStore2,
1061+
testRelays,
1062+
);
1063+
1064+
// Send a message to create cross-kernel refs
1065+
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
1066+
1067+
// Schedule reap on kernel1's remote endpoints - this will cause
1068+
// the crank loop to deliver BOYD to the remote kernel
1069+
kernel1.reapRemotes();
1070+
1071+
// Trigger cranks to process the reap action (which sends BOYD to kernel2)
1072+
// and allow the remote to process it and respond
1073+
for (let i = 0; i < 3; i++) {
1074+
await kernel1.queueMessage(aliceRef, 'ping', []);
1075+
await waitUntilQuiescent(500);
1076+
}
1077+
1078+
// Verify communication still works after DGC
1079+
const response = await sendRemoteMessage(
1080+
kernel1,
1081+
aliceRef,
1082+
bobURL,
1083+
'hello',
1084+
['Alice'],
1085+
);
1086+
expect(response).toContain('vat Bob got "hello" from Alice');
1087+
},
1088+
NETWORK_TIMEOUT,
1089+
);
1090+
1091+
it(
1092+
'processes incoming BOYD by scheduling local reap',
1093+
async () => {
1094+
const { bobRef, aliceURL, aliceRef, bobURL } = await setupAliceAndBob(
1095+
kernel1,
1096+
kernel2,
1097+
kernelStore1,
1098+
kernelStore2,
1099+
testRelays,
1100+
);
1101+
1102+
// Send messages in both directions to create refs on both sides
1103+
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
1104+
await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']);
1105+
1106+
// Schedule reap on kernel2's remote endpoints - this will send BOYD to kernel1
1107+
kernel2.reapRemotes();
1108+
1109+
// Trigger cranks to process the reap and allow BOYD to flow
1110+
for (let i = 0; i < 3; i++) {
1111+
await kernel2.queueMessage(bobRef, 'ping', []);
1112+
await waitUntilQuiescent(500);
1113+
}
1114+
1115+
// Verify communication still works after DGC from both directions
1116+
const aliceToBob = await sendRemoteMessage(
1117+
kernel1,
1118+
aliceRef,
1119+
bobURL,
1120+
'hello',
1121+
['Alice'],
1122+
);
1123+
expect(aliceToBob).toContain('vat Bob got "hello" from Alice');
1124+
1125+
const bobToAlice = await sendRemoteMessage(
1126+
kernel2,
1127+
bobRef,
1128+
aliceURL,
1129+
'hello',
1130+
['Bob'],
1131+
);
1132+
expect(bobToAlice).toContain('vat Alice got "hello" from Bob');
1133+
},
1134+
NETWORK_TIMEOUT,
1135+
);
1136+
1137+
it(
1138+
'completes BOYD exchange without infinite ping-pong',
1139+
async () => {
1140+
const { aliceRef, bobRef, bobURL, aliceURL } = await setupAliceAndBob(
1141+
kernel1,
1142+
kernel2,
1143+
kernelStore1,
1144+
kernelStore2,
1145+
testRelays,
1146+
);
1147+
1148+
// Send messages to establish refs on both sides
1149+
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
1150+
await sendRemoteMessage(kernel2, bobRef, aliceURL, 'hello', ['Bob']);
1151+
1152+
// Schedule reap on BOTH sides simultaneously - this tests that the
1153+
// ping-pong prevention flag works correctly, preventing infinite BOYD loops
1154+
kernel1.reapRemotes();
1155+
kernel2.reapRemotes();
1156+
1157+
// Trigger cranks on both kernels to process the reaps and allow
1158+
// BOYD messages to flow in both directions
1159+
for (let i = 0; i < 3; i++) {
1160+
await Promise.all([
1161+
kernel1.queueMessage(aliceRef, 'ping', []),
1162+
kernel2.queueMessage(bobRef, 'ping', []),
1163+
]);
1164+
await waitUntilQuiescent(500);
1165+
}
1166+
1167+
// Verify continued bidirectional communication works - this proves
1168+
// the BOYD exchange completed without breaking the connection
1169+
const aliceToBob = await sendRemoteMessage(
1170+
kernel1,
1171+
aliceRef,
1172+
bobURL,
1173+
'hello',
1174+
['Alice'],
1175+
);
1176+
expect(aliceToBob).toContain('vat Bob got "hello" from Alice');
1177+
1178+
const bobToAlice = await sendRemoteMessage(
1179+
kernel2,
1180+
bobRef,
1181+
aliceURL,
1182+
'hello',
1183+
['Bob'],
1184+
);
1185+
expect(bobToAlice).toContain('vat Alice got "hello" from Bob');
1186+
},
1187+
NETWORK_TIMEOUT,
1188+
);
1189+
});
10241190
});

packages/ocap-kernel/src/Kernel.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { makeKernelStore } from './store/index.ts';
1414
import type { KernelStore } from './store/index.ts';
1515
import type {
1616
VatId,
17+
RemoteId,
1718
EndpointId,
1819
KRef,
1920
PlatformServices,
@@ -481,6 +482,16 @@ export class Kernel {
481482
this.#vatManager.reapVats(filter);
482483
}
483484

485+
/**
486+
* Reap remotes that match the filter.
487+
* This is for debugging and testing purposes only.
488+
*
489+
* @param filter - A function that returns true if the remote should be reaped.
490+
*/
491+
reapRemotes(filter: (remoteId: RemoteId) => boolean = () => true): void {
492+
this.#remoteManager.reapRemotes(filter);
493+
}
494+
484495
/**
485496
* Pin a vat root.
486497
*

packages/ocap-kernel/src/garbage-collection/garbage-collection.ts

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ import { insistKernelType } from '../store/utils/kernel-slots.ts';
33
import type {
44
GCAction,
55
GCActionType,
6+
EndpointId,
67
KRef,
78
RunQueueItem,
8-
VatId,
99
} from '../types.ts';
1010
import {
1111
actionTypePriorities,
1212
insistGCActionType,
13-
insistVatId,
13+
insistEndpointId,
1414
queueTypeFromActionType,
1515
} from '../types.ts';
1616
import { assert } from '../utils/assert.ts';
@@ -19,43 +19,43 @@ import { assert } from '../utils/assert.ts';
1919
* Parsed representation of a GC action.
2020
*/
2121
type ParsedGCAction = Readonly<{
22-
vatId: VatId;
22+
endpointId: EndpointId;
2323
type: GCActionType;
2424
kref: KRef;
2525
}>;
2626

2727
/**
28-
* Parse a GC action string into a vat id, type, and kref.
28+
* Parse a GC action string into an endpoint id, type, and kref.
2929
*
3030
* @param action - The GC action string to parse.
3131
* @returns The parsed GC action.
3232
*/
3333
function parseAction(action: GCAction): ParsedGCAction {
34-
const [vatId, type, kref] = action.split(' ');
35-
insistVatId(vatId);
34+
const [endpointId, type, kref] = action.split(' ');
35+
insistEndpointId(endpointId);
3636
insistGCActionType(type);
3737
insistKernelType('object', kref);
38-
return harden({ vatId, type, kref });
38+
return harden({ endpointId, type, kref });
3939
}
4040

4141
/**
4242
* Determines if a GC action should be processed based on current system state.
4343
*
4444
* @param storage - The kernel storage.
45-
* @param vatId - The vat id of the vat that owns the kref.
45+
* @param endpointId - The endpoint id of the vat or remote that owns the kref.
4646
* @param type - The type of GC action.
4747
* @param kref - The kref of the object in question.
4848
* @returns True if the action should be processed, false otherwise.
4949
*/
5050
function shouldProcessAction(
5151
storage: KernelStore,
52-
vatId: VatId,
52+
endpointId: EndpointId,
5353
type: GCActionType,
5454
kref: KRef,
5555
): boolean {
56-
const hasCList = storage.hasCListEntry(vatId, kref);
56+
const hasCList = storage.hasCListEntry(endpointId, kref);
5757
const isReachable = hasCList
58-
? storage.getReachableFlag(vatId, kref)
58+
? storage.getReachableFlag(endpointId, kref)
5959
: undefined;
6060
const exists = storage.kernelRefExists(kref);
6161
const { reachable, recognizable } = exists
@@ -78,17 +78,17 @@ function shouldProcessAction(
7878
}
7979

8080
/**
81-
* Filters and processes a group of GC actions for a specific vat and action type.
81+
* Filters and processes a group of GC actions for a specific endpoint and action type.
8282
*
8383
* @param storage - The kernel storage.
84-
* @param vatId - The vat id of the vat that owns the krefs.
84+
* @param endpointId - The endpoint id of the vat or remote that owns the krefs.
8585
* @param actions - The set of GC actions to process.
8686
* @param allActionsSet - The complete set of GC actions.
8787
* @returns Object containing the krefs to process and whether the action set was updated.
8888
*/
8989
function filterActionsForProcessing(
9090
storage: KernelStore,
91-
vatId: VatId,
91+
endpointId: EndpointId,
9292
actions: Set<GCAction>,
9393
allActionsSet: Set<GCAction>,
9494
): { krefs: KRef[]; actionSetUpdated: boolean } {
@@ -97,7 +97,7 @@ function filterActionsForProcessing(
9797

9898
for (const action of actions) {
9999
const { type, kref } = parseAction(action);
100-
if (shouldProcessAction(storage, vatId, type, kref)) {
100+
if (shouldProcessAction(storage, endpointId, type, kref)) {
101101
krefs.push(kref);
102102
}
103103
allActionsSet.delete(action);
@@ -119,43 +119,52 @@ export function processGCActionSet(
119119
const allActionsSet = storage.getGCActions();
120120
let actionSetUpdated = false;
121121

122-
// Group actions by vat and type
123-
const actionsByVat = new Map<VatId, Map<GCActionType, Set<GCAction>>>();
122+
// Group actions by endpoint and type
123+
const actionsByEndpoint = new Map<
124+
EndpointId,
125+
Map<GCActionType, Set<GCAction>>
126+
>();
124127

125128
for (const action of allActionsSet) {
126-
const { vatId, type } = parseAction(action);
129+
const { endpointId, type } = parseAction(action);
127130

128-
if (!actionsByVat.has(vatId)) {
129-
actionsByVat.set(vatId, new Map());
131+
if (!actionsByEndpoint.has(endpointId)) {
132+
actionsByEndpoint.set(endpointId, new Map());
130133
}
131134

132-
const actionsForVatByType = actionsByVat.get(vatId);
133-
assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`);
135+
const actionsForEndpointByType = actionsByEndpoint.get(endpointId);
136+
assert(
137+
actionsForEndpointByType !== undefined,
138+
`No actions for endpoint: ${endpointId}`,
139+
);
134140

135-
if (!actionsForVatByType.has(type)) {
136-
actionsForVatByType.set(type, new Set());
141+
if (!actionsForEndpointByType.has(type)) {
142+
actionsForEndpointByType.set(type, new Set());
137143
}
138144

139-
const actions = actionsForVatByType.get(type);
145+
const actions = actionsForEndpointByType.get(type);
140146
assert(actions !== undefined, `No actions for type: ${type}`);
141147
actions.add(action);
142148
}
143149

144150
// Process actions in priority order
145-
const vatIds = Array.from(actionsByVat.keys()).sort();
151+
const endpointIds = Array.from(actionsByEndpoint.keys()).sort();
146152

147-
for (const vatId of vatIds) {
148-
const actionsForVatByType = actionsByVat.get(vatId);
149-
assert(actionsForVatByType !== undefined, `No actions for vat: ${vatId}`);
153+
for (const endpointId of endpointIds) {
154+
const actionsForEndpointByType = actionsByEndpoint.get(endpointId);
155+
assert(
156+
actionsForEndpointByType !== undefined,
157+
`No actions for endpoint: ${endpointId}`,
158+
);
150159

151-
// Find the highest-priority type of work to do within this vat
160+
// Find the highest-priority type of work to do within this endpoint
152161
for (const type of actionTypePriorities) {
153-
if (actionsForVatByType.has(type)) {
154-
const actions = actionsForVatByType.get(type);
162+
if (actionsForEndpointByType.has(type)) {
163+
const actions = actionsForEndpointByType.get(type);
155164
assert(actions !== undefined, `No actions for type: ${type}`);
156165
const { krefs, actionSetUpdated: updated } = filterActionsForProcessing(
157166
storage,
158-
vatId,
167+
endpointId,
159168
actions,
160169
allActionsSet,
161170
);
@@ -172,7 +181,7 @@ export function processGCActionSet(
172181
const queueType = queueTypeFromActionType.get(type);
173182
assert(queueType !== undefined, `Unknown action type: ${type}`);
174183

175-
return harden({ type: queueType, endpointId: vatId, krefs });
184+
return harden({ type: queueType, endpointId, krefs });
176185
}
177186
}
178187
}

0 commit comments

Comments
 (0)