Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/dds/counter/src/counter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Licensed under the MIT License.
*/

import { assert } from "@fluidframework/core-utils/internal";
import { DoublyLinkedList, assert } from "@fluidframework/core-utils/internal";
import type {
IChannelAttributes,
IFluidDataStoreRuntime,
Expand Down Expand Up @@ -76,7 +76,7 @@ export class SharedCounter
/**
* Tracks pending local ops that have not been ack'd yet.
*/
private readonly pendingOps: IPendingOperation[] = [];
private readonly pendingOps = new DoublyLinkedList<IPendingOperation>();

/**
* The next message id to be used when submitting an op.
Expand Down Expand Up @@ -168,7 +168,7 @@ export class SharedCounter
// and we should now remove it from this.pendingOps.
// If the message is from a remote client, we should process it.
if (local) {
const pendingOp = this.pendingOps.shift();
const pendingOp = this.pendingOps.shift()?.data;
const messageId = messageContent.localOpMetadata;
assert(typeof messageId === "number", 0xc8e /* localOpMetadata should be a number */);
assert(
Expand Down Expand Up @@ -217,7 +217,7 @@ export class SharedCounter
typeof localOpMetadata === "number",
0xc90 /* localOpMetadata should be a number */,
);
const pendingOp = this.pendingOps.pop();
const pendingOp = this.pendingOps.pop()?.data;
assert(
// eslint-disable-next-line @typescript-eslint/prefer-optional-chain -- using ?. could change behavior
pendingOp !== undefined &&
Expand Down
96 changes: 96 additions & 0 deletions packages/dds/counter/src/test/counter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,57 @@ describe("SharedCounter", () => {
assert.ok(fired1, "The event for first increment was not fired");
assert.ok(fired2, "The event for second increment was not fired");
});

it("preserves FIFO order when draining a burst of increments", () => {
// Submit a burst of N increments before any messages are processed. This exercises
// the pending-op queue: every increment is queued locally and submitted with a
// monotonically increasing messageId. When the acks come back in order, the local
// FIFO assert in SharedCounter.processMessage (matching pendingOps.shift() against
// the ack'd messageId) guarantees ordering -- a regression in pendingMessageIds
// management would trip that assert here.
const N = 32;
const deltas: number[] = [];
for (let i = 1; i <= N; i++) {
// Alternate sign so the running sum is non-trivial.
deltas.push(i % 2 === 0 ? -i : i);
}
const expectedSum = deltas.reduce((a, b) => a + b, 0);

// Track the order in which the remote observes the increments.
const remoteObserved: number[] = [];
testCounter2.on("incremented", (incrementAmount: number) => {
remoteObserved.push(incrementAmount);
});

for (const d of deltas) {
testCounter.increment(d);
}

// Local counter has already optimistically applied every increment.
assert.equal(
testCounter.value,
expectedSum,
"Local counter should reflect the full burst optimistically",
);

containerRuntimeFactory.processAllMessages();

assert.equal(
testCounter.value,
expectedSum,
"Local counter value should equal sum of bursted increments",
);
assert.equal(
testCounter2.value,
expectedSum,
"Remote counter value should equal sum of bursted increments",
);
assert.deepEqual(
remoteObserved,
deltas,
"Remote counter should observe increments in FIFO submit order",
);
});
});
});

Expand Down Expand Up @@ -410,6 +461,51 @@ describe("SharedCounter", () => {
);
});

it("preserves FIFO order across a burst with partial rollback", () => {
// Burst K increments, flush only the first M of them, then rollback the remainder.
// The flushed prefix must reach counter2 in submit order; the rolled-back suffix must
// reverse counter1 back to the value at the flush boundary. This exercises both the
// FIFO assert (during ack processing of the flushed prefix) and the LIFO pop in
// rollback (which must match against the most recently submitted pending op).
const K = 16;
const M = 10;
const deltas: number[] = [];
for (let i = 1; i <= K; i++) {
deltas.push(i % 2 === 0 ? -i : i);
}
const flushedSum = deltas.slice(0, M).reduce((a, b) => a + b, 0);

const remoteObserved: number[] = [];
counter2.on("incremented", (incrementAmount: number) => {
remoteObserved.push(incrementAmount);
});

for (const d of deltas) {
counter1.increment(d);
}

containerRuntime1.flushSomeMessages(M);
containerRuntime1.rollback?.();
containerRuntime1.flush();
containerRuntimeFactory.processAllMessages();

assert.equal(
counter1.value,
flushedSum,
"counter1 should reflect only the flushed prefix after rolling back the rest",
);
assert.equal(
counter2.value,
flushedSum,
"counter2 should reflect only the flushed prefix",
);
assert.deepEqual(
remoteObserved,
deltas.slice(0, M),
"counter2 should observe the flushed prefix in FIFO submit order",
);
});

it("can rollback across remote ops", () => {
counter1.increment(10);
counter2.increment(20);
Expand Down
Loading