From 296dd15c70450c25968f201122d3314291af9ed0 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 15:40:44 -0700 Subject: [PATCH 1/2] perf(counter): use DoublyLinkedList for pendingOps --- packages/dds/counter/src/counter.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/dds/counter/src/counter.ts b/packages/dds/counter/src/counter.ts index 4258984f9254..18bc4d93bd84 100644 --- a/packages/dds/counter/src/counter.ts +++ b/packages/dds/counter/src/counter.ts @@ -3,7 +3,7 @@ * Licensed under the MIT License. */ -import { assert } from "@fluidframework/core-utils/internal"; +import { DoublyLinkedList, assert } from "@fluidframework/core-utils/internal"; import type { IChannelAttributes, IFluidDataStoreRuntime, @@ -76,7 +76,7 @@ export class SharedCounter /** * Tracks pending local ops that have not been ack'd yet. */ - private readonly pendingOps: IPendingOperation[] = []; + private readonly pendingOps = new DoublyLinkedList(); /** * The next message id to be used when submitting an op. @@ -168,7 +168,7 @@ export class SharedCounter // and we should now remove it from this.pendingOps. // If the message is from a remote client, we should process it. if (local) { - const pendingOp = this.pendingOps.shift(); + const pendingOp = this.pendingOps.shift()?.data; const messageId = messageContent.localOpMetadata; assert(typeof messageId === "number", 0xc8e /* localOpMetadata should be a number */); assert( @@ -217,7 +217,7 @@ export class SharedCounter typeof localOpMetadata === "number", 0xc90 /* localOpMetadata should be a number */, ); - const pendingOp = this.pendingOps.pop(); + const pendingOp = this.pendingOps.pop()?.data; assert( // eslint-disable-next-line @typescript-eslint/prefer-optional-chain -- using ?. could change behavior pendingOp !== undefined && From a7fb6fe727ae813f0fb3d987846f4aa59b84ac85 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 17:12:33 -0700 Subject: [PATCH 2/2] test(counter): burst FIFO drain regression --- packages/dds/counter/src/test/counter.spec.ts | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/packages/dds/counter/src/test/counter.spec.ts b/packages/dds/counter/src/test/counter.spec.ts index a5185e14848d..b342fb8060a0 100644 --- a/packages/dds/counter/src/test/counter.spec.ts +++ b/packages/dds/counter/src/test/counter.spec.ts @@ -208,6 +208,57 @@ describe("SharedCounter", () => { assert.ok(fired1, "The event for first increment was not fired"); assert.ok(fired2, "The event for second increment was not fired"); }); + + it("preserves FIFO order when draining a burst of increments", () => { + // Submit a burst of N increments before any messages are processed. This exercises + // the pending-op queue: every increment is queued locally and submitted with a + // monotonically increasing messageId. When the acks come back in order, the local + // FIFO assert in SharedCounter.processMessage (matching pendingOps.shift() against + // the ack'd messageId) guarantees ordering -- a regression in pendingMessageIds + // management would trip that assert here. + const N = 32; + const deltas: number[] = []; + for (let i = 1; i <= N; i++) { + // Alternate sign so the running sum is non-trivial. + deltas.push(i % 2 === 0 ? -i : i); + } + const expectedSum = deltas.reduce((a, b) => a + b, 0); + + // Track the order in which the remote observes the increments. + const remoteObserved: number[] = []; + testCounter2.on("incremented", (incrementAmount: number) => { + remoteObserved.push(incrementAmount); + }); + + for (const d of deltas) { + testCounter.increment(d); + } + + // Local counter has already optimistically applied every increment. + assert.equal( + testCounter.value, + expectedSum, + "Local counter should reflect the full burst optimistically", + ); + + containerRuntimeFactory.processAllMessages(); + + assert.equal( + testCounter.value, + expectedSum, + "Local counter value should equal sum of bursted increments", + ); + assert.equal( + testCounter2.value, + expectedSum, + "Remote counter value should equal sum of bursted increments", + ); + assert.deepEqual( + remoteObserved, + deltas, + "Remote counter should observe increments in FIFO submit order", + ); + }); }); }); @@ -410,6 +461,51 @@ describe("SharedCounter", () => { ); }); + it("preserves FIFO order across a burst with partial rollback", () => { + // Burst K increments, flush only the first M of them, then rollback the remainder. + // The flushed prefix must reach counter2 in submit order; the rolled-back suffix must + // reverse counter1 back to the value at the flush boundary. This exercises both the + // FIFO assert (during ack processing of the flushed prefix) and the LIFO pop in + // rollback (which must match against the most recently submitted pending op). + const K = 16; + const M = 10; + const deltas: number[] = []; + for (let i = 1; i <= K; i++) { + deltas.push(i % 2 === 0 ? -i : i); + } + const flushedSum = deltas.slice(0, M).reduce((a, b) => a + b, 0); + + const remoteObserved: number[] = []; + counter2.on("incremented", (incrementAmount: number) => { + remoteObserved.push(incrementAmount); + }); + + for (const d of deltas) { + counter1.increment(d); + } + + containerRuntime1.flushSomeMessages(M); + containerRuntime1.rollback?.(); + containerRuntime1.flush(); + containerRuntimeFactory.processAllMessages(); + + assert.equal( + counter1.value, + flushedSum, + "counter1 should reflect only the flushed prefix after rolling back the rest", + ); + assert.equal( + counter2.value, + flushedSum, + "counter2 should reflect only the flushed prefix", + ); + assert.deepEqual( + remoteObserved, + deltas.slice(0, M), + "counter2 should observe the flushed prefix in FIFO submit order", + ); + }); + it("can rollback across remote ops", () => { counter1.increment(10); counter2.increment(20);