Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 71 additions & 86 deletions docs/ken-protocol-assessment.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ Key aspects:
| Retransmission | ✓ | Timeout-based retransmit until ACK or max retries |
| Crash-safe persistence | ✓ | Write message first, then update nextSendSeq |
| Local recovery | ✓ | Restore seq state, restart ACK timeout |
| **Transactional turns** | ✓ | Crank buffering defers outputs until crank commit |
| **Deferred transmission** | ✓ | Outputs reach RemoteHandle only after originating crank commits |
| **Output validity** | ✓ | Crank buffering ensures outputs escape only after commit |
| **Atomic checkpoint** | ✓ | Database savepoints make crank state changes atomic |
| Transactional turns | ✓ | Crank buffering defers outputs until crank commit |
| Deferred transmission | ✓ | Outputs reach RemoteHandle only after originating crank commits |
| Output validity | ✓ | Crank buffering ensures outputs escape only after commit |
| Atomic checkpoint | ✓ | Database savepoints make crank state changes atomic |
| Exactly-once receive | ✓ | Transactional receive with dedup check (Issue #808) |
| FIFO ordering | ✓ | TCP guarantees in-order; dedup handles retransmits |

### Crank Buffering (Issue #786)

Expand Down Expand Up @@ -78,47 +80,61 @@ This achieves Ken's property that **outputs are only externalized after successf

`RemoteHandle` persists messages to `remotePending` before transmitting for a different reason: to enable retransmission on recovery if the transmission or ACK is lost. This is part of the at-least-once delivery mechanism, not the output validity mechanism.

### Remaining Gaps (Receive Side)
### Receive-Side Implementation (Issue #808)

The remaining gaps are on the **receive side** of remote messaging:
The receive side of remote messaging implements Ken's exactly-once delivery guarantee through transactional message processing with duplicate detection.

#### 1. Done Table / Duplicate Detection (Gap)
#### Duplicate Detection

Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. The `Done` table is updated atomically with the application state at crank commit.
Ken maintains a `Done` table ensuring each message is delivered to the application **at most once**. Our implementation achieves this by checking `seq <= highestReceivedSeq` before processing:

We track `highestReceivedSeq` per remote, but there's a gap in how it interacts with delivery:

**Scenario A - Update on receive, before delivery:**
1. Receive message seq=5 from remote R
2. Update `highestReceivedSeq` to 5
3. Add message to run queue for delivery to local vat
4. Crash before delivery crank commits
5. On recovery: `highestReceivedSeq=5` suggests we processed it
6. Remote retransmits seq=5, we ignore it
7. **Message lost** - vat never received it
```typescript
// Duplicate detection: skip if we've already processed this sequence number
if (seq <= this.#highestReceivedSeq) {
this.#logger.log(`ignoring duplicate message seq=${seq}`);
return null;
}
```

**Scenario B - Update after delivery:**
1. Receive message seq=5, add to run queue
2. Delivery crank runs, vat processes message, crank commits
3. Crash before `highestReceivedSeq` is updated
4. On recovery: `highestReceivedSeq < 5`
5. Remote retransmits seq=5, we deliver again
6. **Duplicate delivery** - vat receives it twice
After a crash and retransmit, duplicate messages are detected and ignored.

#### Transactional Message Processing

Message processing is wrapped in a database savepoint to ensure atomicity:

```typescript
const savepointName = `receive_${this.remoteId}_${seq}`;
this.#kernelStore.createSavepoint(savepointName);
try {
// Process message (translate refs, add to run queue, etc.)
switch (method) {
case 'deliver': ...
case 'redeemURL': ...
case 'redeemURLReply': ...
}

// Persist sequence tracking at the end, within the transaction
this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq);

// Commit the transaction
this.#kernelStore.releaseSavepoint(savepointName);
} catch (error) {
// Rollback on any error - also revert in-memory state
this.#highestReceivedSeq = previousHighestReceivedSeq;
this.#kernelStore.rollbackSavepoint(savepointName);
throw error;
}
```

**What's needed**: A `Done` table (or equivalent) that is updated atomically with the delivery crank commit, and checked before delivering incoming messages.
This achieves atomicity: if a crash occurs before commit, both the run queue entry and the sequence update roll back together. The remote retransmits, and we process it correctly.

#### 2. FIFO Enforcement on Receive (Gap)
#### FIFO Ordering

Ken enforces per-sender FIFO ordering via `next_ready()` which only delivers the next expected sequence number.

If messages arrive out of order from the network (e.g., seq 1, 3, 2):
- We should deliver seq=1
- Buffer seq=3 until seq=2 arrives
- Deliver seq=2, then seq=3

We don't currently enforce this ordering on the receive side. Out-of-order network delivery could result in out-of-order application delivery.
**Our situation**: We use TCP-based transports (libp2p streams) which guarantee in-order delivery during normal operation. Out-of-order arrival only occurs after a crash when the sender retransmits. With duplicate detection, retransmitted messages for already-processed sequence numbers are dropped, maintaining FIFO semantics.

**What's needed**: Track expected next seq per remote sender, buffer out-of-order messages, deliver in sequence order only.
Therefore, explicit receive-side reordering is not required given our transport guarantees.

### Summary Table

Expand All @@ -131,42 +147,12 @@ We don't currently enforce this ordering on the receive side. Out-of-order netwo
| Consistent frontier | **Yes** | Each kernel's checkpoint is independent |
| Local recovery | **Yes** | Crashes don't affect other processes |
| Sender-based logging | **Yes** | Messages persisted in remotePending until ACKed |
| Exactly-once delivery | **Partial** | At-least-once; missing Done table for deduplication |
| FIFO ordering | **Partial** | Per-sender seq on send; no enforcement on receive |

## What Would Be Needed for Full Ken Properties

### 1. Add Done Table for Exactly-Once Delivery

Track processed messages and deduplicate on receive:

**Option A: Per-remote processed sequence tracking**
- Store `highestProcessedSeq.${remoteId}` updated atomically with delivery crank
- On receive: if `seq <= highestProcessedSeq`, ACK but don't deliver
- Simple but requires in-order processing

**Option B: Explicit Done table**
- Store `done.${remoteId}.${seq} = true` for each processed message
- On receive: check Done table before delivering
- Supports out-of-order processing
- Requires garbage collection of old entries (after ACK confirms sender discarded)

Either approach requires the processed-message record to be updated **atomically with the delivery crank commit**, so that crash recovery sees consistent state.

### 2. FIFO Enforcement on Receive

Buffer and reorder incoming messages:

- Track `expectedNextSeq.${remoteId}` per sender
- On receive: if `seq > expectedNextSeq`, buffer the message
- When `expectedNextSeq` message arrives, deliver it and any buffered successors
- Update `expectedNextSeq` as messages are delivered

This interacts with the Done table: we need to handle the case where we've already processed some messages (from Done table) when determining what's "expected next."
| Exactly-once delivery | **Yes** | Transactional receive with dedup check |
| FIFO ordering | **Yes** | TCP guarantees in-order; dedup handles retransmits |

## Architectural Summary

**Send side (achieved with crank buffering):**
**Send side (crank buffering):**
```
Vat Crank:
vat processes message → syscalls buffer outputs
Expand All @@ -180,32 +166,31 @@ Later (separate operation):

The key insight: by the time RemoteHandle sees a message, the originating crank has already committed. Output validity is achieved.

**Receive side (gaps remain):**
**Receive side (transactional processing):**
```
Current:
receive from network → add to run queue → deliver to vat
(no deduplication, no ordering enforcement)

Needed:
receive from network
→ check Done table (skip if already processed)
→ check sequence (buffer if out of order)
→ add to run queue
→ deliver to vat
→ atomically update Done table with delivery crank commit
receive from network
→ check seq <= highestReceivedSeq (skip if duplicate)
→ begin transaction (savepoint)
→ process message, add to run queue
→ persist highestReceivedSeq
→ commit transaction (release savepoint)
```

If a crash occurs before commit, both the run queue entry and the sequence update roll back together. The remote retransmits, and we process it correctly.

## Progress Summary

| Area | Status |
|------|--------|
| Kernel-internal output buffering | **Achieved** |
| Rollback discards uncommitted outputs | **Achieved** |
| Atomic kernel state + output queue | **Achieved** |
| Output validity (send side) | **Achieved** |
| Deferred transmission (send side) | **Achieved** |
| Done table for deduplication (receive side) | Gap |
| FIFO enforcement (receive side) | Gap |
| Kernel-internal output buffering | **Achieved** (Issue #786) |
| Rollback discards uncommitted outputs | **Achieved** (Issue #786) |
| Atomic kernel state + output queue | **Achieved** (Issue #786) |
| Output validity (send side) | **Achieved** (Issue #786) |
| Deferred transmission (send side) | **Achieved** (Issue #786) |
| FIFO ordering | **Achieved** (TCP transport) |
| Exactly-once receive (dedup + atomicity) | **Achieved** (Issue #808) |

All Ken protocol properties are now implemented.

## References

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,10 @@ export class PlatformServicesClient implements PlatformServices {
async initializeRemoteComms(
keySeed: string,
options: RemoteCommsOptions,
remoteMessageHandler: (from: string, message: string) => Promise<string>,
remoteMessageHandler: (
from: string,
message: string,
) => Promise<string | null>,
onRemoteGiveUp?: (peerId: string) => void,
incarnationId?: string,
): Promise<void> {
Expand Down Expand Up @@ -275,9 +278,9 @@ export class PlatformServicesClient implements PlatformServices {
*
* @param from - The peer ID that sent the message.
* @param message - The message received.
* @returns A promise that resolves with the reply message, or an empty string if no reply is needed.
* @returns A promise that resolves with the reply message, or null if no reply is needed.
*/
async #remoteDeliver(from: string, message: string): Promise<string> {
async #remoteDeliver(from: string, message: string): Promise<string | null> {
if (this.#remoteMessageHandler) {
return await this.#remoteMessageHandler(from, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ describe('PlatformServicesServer', () => {
new MessageEvent('message', {
data: {
id: 'vws:1',
result: '',
result: null,
jsonrpc: '2.0',
},
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,12 @@ export class PlatformServicesServer {
*
* @param from - The peer ID that sent the message.
* @param message - The message received.
* @returns A promise that resolves with the reply message, or an empty string if no reply is needed.
* @returns A promise that resolves with the reply message, or null if no reply is needed.
*/
async #handleRemoteMessage(from: string, message: string): Promise<string> {
async #handleRemoteMessage(
from: string,
message: string,
): Promise<string | null> {
return this.#rpcClient.call('remoteDeliver', {
from,
message,
Expand Down
12 changes: 9 additions & 3 deletions packages/nodejs/src/kernel/PlatformServices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,12 @@ export class NodejsPlatformServices implements PlatformServices {
*
* @param from - The peer ID that sent the message.
* @param message - The message received.
* @returns A promise that resolves with the reply message, or an empty string if no reply is needed.
* @returns A promise that resolves with the reply message, or null if no reply is needed.
*/
async #handleRemoteMessage(from: string, message: string): Promise<string> {
async #handleRemoteMessage(
from: string,
message: string,
): Promise<string | null> {
if (!this.#remoteMessageHandler) {
// This can't actually happen, but TypeScript can't infer it
throw Error('remote comms not initialized');
Expand All @@ -234,7 +237,10 @@ export class NodejsPlatformServices implements PlatformServices {
async initializeRemoteComms(
keySeed: string,
options: RemoteCommsOptions,
remoteMessageHandler: (from: string, message: string) => Promise<string>,
remoteMessageHandler: (
from: string,
message: string,
) => Promise<string | null>,
onRemoteGiveUp?: (peerId: string) => void,
incarnationId?: string,
): Promise<void> {
Expand Down
Loading
Loading