diff --git a/packages/dds/matrix/src/matrix.ts b/packages/dds/matrix/src/matrix.ts index 5816dc24ab84..c14a78854468 100644 --- a/packages/dds/matrix/src/matrix.ts +++ b/packages/dds/matrix/src/matrix.ts @@ -8,7 +8,12 @@ import type { IEventThisPlaceHolder, IEventProvider, } from "@fluidframework/core-interfaces"; -import { assert, unreachableCase } from "@fluidframework/core-utils/internal"; +import { + assert, + DoublyLinkedList, + type ListNode, + unreachableCase, +} from "@fluidframework/core-utils/internal"; import type { IChannelAttributes, IFluidDataStoreRuntime, @@ -219,6 +224,66 @@ type FirstWriterWinsPolicy = cellLastWriteTracker: SparseArray2D; }; +/** + * Encapsulates the list of pending local changes for a single cell, plus the side-index from + * `localSeq` to the corresponding `DoublyLinkedList` node enabling O(1) lookup during reSubmit + * (instead of a linear `findIndex` scan). + * + * The list and the index are kept in lock-step: every mutator updates both. + */ +class PendingLocalCellChanges { + private readonly list = new DoublyLinkedList<{ localSeq: number; value: MatrixItem }>(); + private readonly index = new Map< + number, + ListNode<{ localSeq: number; value: MatrixItem }> + >(); + + public get length(): number { + return this.list.length; + } + + public get last(): ListNode<{ localSeq: number; value: MatrixItem }> | undefined { + return this.list.last; + } + + public push(localSeq: number, value: MatrixItem): void { + assert(!this.index.has(localSeq), "duplicate localSeq in PendingLocalCellChanges"); + const { first: node } = this.list.push({ localSeq, value }); + // single-item push: first === last, both reference the newly inserted node. + this.index.set(localSeq, node); + } + + public removeByLocalSeq( + localSeq: number, + ): { localSeq: number; value: MatrixItem } | undefined { + const node = this.index.get(localSeq); + if (node === undefined) { + return undefined; + } + this.list.remove(node); + this.index.delete(localSeq); + return node.data; + } + + public shift(): { localSeq: number; value: MatrixItem } | undefined { + const node = this.list.shift(); + if (node === undefined) { + return undefined; + } + this.index.delete(node.data.localSeq); + return node.data; + } + + public pop(): { localSeq: number; value: MatrixItem } | undefined { + const node = this.list.pop(); + if (node === undefined) { + return undefined; + } + this.index.delete(node.data.localSeq); + return node.data; + } +} + /** * Tracks pending local changes for a cell. */ @@ -226,7 +291,7 @@ interface PendingCellChanges { /** * The local changes including the local seq, and the value set at that local seq. */ - local: { localSeq: number; value: MatrixItem }[]; + local: PendingLocalCellChanges; /** * The latest consensus value across all clients. * this will either be a remote value or ack'd local @@ -513,9 +578,9 @@ export class SharedMatrix this.submitLocalMessage(op, metadata); const pendingCell: PendingCellChanges = this.pending.getCell(rowHandle, colHandle) ?? { - local: [], + local: new PendingLocalCellChanges(), }; - pendingCell.local.push({ localSeq, value }); + pendingCell.local.push(localSeq, value); this.pending.setCell(rowHandle, colHandle, pendingCell); return pendingCell; } @@ -844,9 +909,8 @@ export class SharedMatrix assert(pendingCell !== undefined, 0xba4 /* local operation must have a pending array */); const { local } = pendingCell; assert(local !== undefined, 0xba5 /* local operation must have a pending array */); - const localSeqIndex = local.findIndex((p) => p.localSeq === localSeq); - assert(localSeqIndex >= 0, 0xba6 /* local operation must have a pending entry */); - const [change] = local.splice(localSeqIndex, 1); + const change = local.removeByLocalSeq(localSeq); + assert(change !== undefined, 0xba6 /* local operation must have a pending entry */); assert(change.localSeq === localSeq, 0xba7 /* must match */); if ( @@ -910,7 +974,8 @@ export class SharedMatrix const previous = pendingCell.local.length > 0 - ? pendingCell.local[pendingCell.local.length - 1].value + ? // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + pendingCell.local.last!.data.value : pendingCell.consensus; this.setCellCore( diff --git a/packages/dds/matrix/src/test/matrix.reconnect.spec.ts b/packages/dds/matrix/src/test/matrix.reconnect.spec.ts index f642241d93b4..ae067c0e6f65 100644 --- a/packages/dds/matrix/src/test/matrix.reconnect.spec.ts +++ b/packages/dds/matrix/src/test/matrix.reconnect.spec.ts @@ -231,4 +231,54 @@ describe("SharedMatrix reconnect", () => { assert.deepEqual(extract(matrix1), expected); assert.deepEqual(extract(matrix3), expected); }); + + // Regression test for the resubmit path optimized when many pending writes target the + // same (row, col). Disconnect, write the same cell N times, reconnect, and assert that + // only the final value remains and that both clients converge. + it("resubmits N writes to the same cell after reconnect", () => { + const containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const dataRuntime1 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dataRuntime1); + const dataRuntime2 = new MockFluidDataStoreRuntime(); + const containerRuntime2 = containerRuntimeFactory.createContainerRuntime(dataRuntime2); + + const matrix1 = matrixFactory.create(dataRuntime1, "A"); + matrix1.connect({ + deltaConnection: dataRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + const matrix2 = matrixFactory.create(dataRuntime2, "B"); + matrix2.connect({ + deltaConnection: dataRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + // Create a 1x1 matrix. + matrix1.insertRows(0, 1); + matrix1.insertCols(0, 1); + containerRuntimeFactory.processAllMessages(); + + // Disconnect matrix2, then write the same (row, col) N times. This stages a + // large run of pending setCell ops all targeting the same cell, which is the + // hot path being optimized by the pending-cell-list change. + containerRuntime2.connected = false; + const N = 200; + for (let i = 0; i < N; i++) { + matrix2.setCell(0, 0, i); + } + + // While disconnected, matrix2 sees the latest local write and matrix1 sees nothing. + assert.deepEqual(extract(matrix1), [[undefined]]); + assert.deepEqual(extract(matrix2), [[N - 1]]); + + // Reconnect matrix2 to force the resubmit path to re-emit all N pending writes. + containerRuntime2.connected = true; + containerRuntimeFactory.processAllMessages(); + + // Both clients should converge on the last-written value. + const expected = [[N - 1]]; + assert.deepEqual(extract(matrix1), expected); + assert.deepEqual(extract(matrix2), expected); + }); });