Skip to content

Commit 490d519

Browse files
kevin-dpclaude
andcommitted
fix(persistence): allow leader to process coordinator-delivered tx:committed messages
The leader tab's `onCoordinatorMessage` handler skipped ALL messages where `senderId` matched the coordinator's own node ID. But when the coordinator processes a follower's RPC in `handleApplyLocalMutations`, it delivers the resulting `tx:committed` to local subscribers using the coordinator's own `senderId`. This caused the leader's runtime to silently ignore follower mutations — they were written to SQLite but never applied to the leader's in-memory collection. Fix: Allow `tx:committed` messages from self to pass through the filter. The seq dedup logic in `processCommittedTxUnsafe` already prevents double-processing: when the leader's own mutations go through the coordinator, `observeStreamPosition` is called with the response's term/seq before the local `tx:committed` delivery runs under the mutex, so the duplicate is detected via `txCommitted.seq <= this.latestSeq`. Other message types (heartbeats, resets) from self are still skipped. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d046bee commit 490d519

1 file changed

Lines changed: 9 additions & 4 deletions

File tree

  • packages/db-sqlite-persisted-collection-core/src

packages/db-sqlite-persisted-collection-core/src/persisted.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,12 +1581,12 @@ class PersistedCollectionRuntime<
15811581
return
15821582
}
15831583

1584-
if (message.senderId === this.persistence.coordinator.getNodeId()) {
1585-
return
1586-
}
1587-
15881584
const { payload } = message
1585+
const isSelf = message.senderId === this.persistence.coordinator.getNodeId()
15891586

1587+
// Allow tx:committed from self — the coordinator produces these on behalf
1588+
// of both local and remote mutations. The seq dedup in
1589+
// processCommittedTxUnsafe prevents double-processing of our own writes.
15901590
if (isTxCommittedPayload(payload)) {
15911591
if (this.isHydrating) {
15921592
this.queuedTxCommitted.push(payload)
@@ -1601,6 +1601,11 @@ class PersistedCollectionRuntime<
16011601
return
16021602
}
16031603

1604+
// Skip remaining message types from self (e.g. heartbeats, resets)
1605+
if (isSelf) {
1606+
return
1607+
}
1608+
16041609
if (isCollectionResetPayload(payload)) {
16051610
void this.applyMutex
16061611
.run(() => this.truncateAndReloadUnsafe())

0 commit comments

Comments
 (0)