From 2551e32a0d8442cca97def26f7fad83fdc2c13fe Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 15:25:35 -0700 Subject: [PATCH 1/4] perf(task-manager): use DoublyLinkedList + lookup maps for taskQueues and latestPendingOps --- packages/dds/task-manager/src/taskManager.ts | 168 ++++++++++++------- 1 file changed, 110 insertions(+), 58 deletions(-) diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 0a9f3d9ca11a..72a9d1f6fcc0 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -8,7 +8,11 @@ import { AttachState, type ReadOnlyInfo, } from "@fluidframework/container-definitions/internal"; -import { assert } from "@fluidframework/core-utils/internal"; +import { + DoublyLinkedList, + type ListNode, + assert, +} from "@fluidframework/core-utils/internal"; import type { IChannelAttributes, IFluidDataStoreRuntime, @@ -91,7 +95,14 @@ export class TaskManagerClass * Mapping of taskId to a queue of clientIds that are waiting on the task. Maintains the consensus state of the * queue, even if we know we've submitted an op that should eventually modify the queue. */ - private readonly taskQueues = new Map(); + private readonly taskQueues = new Map>(); + + /** + * Side-lookup map paralleling {@link TaskManagerClass.taskQueues}: for each taskId, maps clientId to the + * {@link ListNode} that holds that clientId in the corresponding queue. Enables O(1) lookup and removal of a + * client from a queue without scanning the list. Must be kept symmetric with `taskQueues` at every mutation. + */ + private readonly taskQueueNodes = new Map>>(); // opWatcher emits for every op on this data store. This is just a repackaging of processMessagesCore into events. private readonly opWatcher: EventEmitter = new EventEmitter(); @@ -112,7 +123,7 @@ export class TaskManagerClass /** * Tracks the most recent pending op for a given task */ - private readonly latestPendingOps = new Map(); + private readonly latestPendingOps = new Map>(); /** * Tracks tasks that are this client is currently subscribed to. @@ -153,12 +164,12 @@ export class TaskManagerClass if (local) { const latestPendingOps = this.latestPendingOps.get(taskId); assert(latestPendingOps !== undefined, 0xc3c /* No pending ops for task */); - const pendingOp = latestPendingOps.shift(); + const pendingOpNode = latestPendingOps.shift(); assert( - pendingOp !== undefined && pendingOp.messageId === messageId, + pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId, 0xc3d /* Unexpected op */, ); - assert(pendingOp.type === "volunteer", 0x07c /* "Unexpected op type" */); + assert(pendingOpNode.data.type === "volunteer", 0x07c /* "Unexpected op type" */); if (latestPendingOps.length === 0) { this.latestPendingOps.delete(taskId); } @@ -174,12 +185,12 @@ export class TaskManagerClass if (local) { const latestPendingOps = this.latestPendingOps.get(taskId); assert(latestPendingOps !== undefined, 0xc3e /* No pending ops for task */); - const pendingOp = latestPendingOps.shift(); + const pendingOpNode = latestPendingOps.shift(); assert( - pendingOp !== undefined && pendingOp.messageId === messageId, + pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId, 0xc3f /* Unexpected op */, ); - assert(pendingOp.type === "abandon", 0x07e /* "Unexpected op type" */); + assert(pendingOpNode.data.type === "abandon", 0x07e /* "Unexpected op type" */); if (latestPendingOps.length === 0) { this.latestPendingOps.delete(taskId); } @@ -196,18 +207,19 @@ export class TaskManagerClass if (local) { const latestPendingOps = this.latestPendingOps.get(taskId); assert(latestPendingOps !== undefined, 0xc40 /* No pending ops for task */); - const pendingOp = latestPendingOps.shift(); + const pendingOpNode = latestPendingOps.shift(); assert( - pendingOp !== undefined && pendingOp.messageId === messageId, + pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId, 0xc41 /* Unexpected op */, ); - assert(pendingOp.type === "complete", 0x401 /* Unexpected op type */); + assert(pendingOpNode.data.type === "complete", 0x401 /* Unexpected op type */); if (latestPendingOps.length === 0) { this.latestPendingOps.delete(taskId); } } this.taskQueues.delete(taskId); + this.taskQueueNodes.delete(taskId); this.completedWatcher.emit("completed", taskId, messageId); this.emit("completed", taskId); }, @@ -237,7 +249,7 @@ export class TaskManagerClass this.connectionWatcher.on("disconnect", () => { // Emit "lost" for any tasks we were assigned to. for (const [taskId, clientQueue] of this.taskQueues.entries()) { - if (this.isAttached() && clientQueue[0] === this.clientId) { + if (this.isAttached() && clientQueue.first?.data === this.clientId) { this.emit("lost", taskId); } } @@ -260,7 +272,7 @@ export class TaskManagerClass this.submitLocalMessage(op, pendingOp.messageId); const latestPendingOps = this.latestPendingOps.get(taskId); if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, [pendingOp]); + this.latestPendingOps.set(taskId, new DoublyLinkedList([pendingOp])); } else { latestPendingOps.push(pendingOp); } @@ -278,7 +290,7 @@ export class TaskManagerClass this.submitLocalMessage(op, pendingOp.messageId); const latestPendingOps = this.latestPendingOps.get(taskId); if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, [pendingOp]); + this.latestPendingOps.set(taskId, new DoublyLinkedList([pendingOp])); } else { latestPendingOps.push(pendingOp); } @@ -297,7 +309,7 @@ export class TaskManagerClass this.submitLocalMessage(op, pendingOp.messageId); const latestPendingOps = this.latestPendingOps.get(taskId); if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, [pendingOp]); + this.latestPendingOps.set(taskId, new DoublyLinkedList([pendingOp])); } else { latestPendingOps.push(pendingOp); } @@ -560,7 +572,7 @@ export class TaskManagerClass return false; } - const currentAssignee = this.taskQueues.get(taskId)?.[0]; + const currentAssignee = this.taskQueues.get(taskId)?.first?.data; return currentAssignee !== undefined && currentAssignee === this.clientId; } @@ -572,7 +584,7 @@ export class TaskManagerClass return false; } - return this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + return this.taskQueueNodes.get(taskId)?.has(this.clientId) ?? false; } /** @@ -594,6 +606,7 @@ export class TaskManagerClass // we are attached. Additionally, we don't need to check if we are connected while detached. if (this.isDetached()) { this.taskQueues.delete(taskId); + this.taskQueueNodes.delete(taskId); this.completedWatcher.emit("completed", taskId); this.emit("completed", taskId); return; @@ -634,13 +647,12 @@ export class TaskManagerClass } // Only include tasks if there are clients in the queue. - const filteredMap = new Map(); + const content: [string, string[]][] = []; for (const [taskId, queue] of this.taskQueues) { if (queue.length > 0) { - filteredMap.set(taskId, queue); + content.push([taskId, [...queue.map((node) => node.data)]]); } } - const content = [...filteredMap.entries()]; return createSingleBlobSummary(snapshotFileName, JSON.stringify(content)); } @@ -650,7 +662,14 @@ export class TaskManagerClass protected async loadCore(storage: IChannelStorageService): Promise { const content = await readAndParse<[string, string[]][]>(storage, snapshotFileName); for (const [taskId, clientIdQueue] of content) { - this.taskQueues.set(taskId, clientIdQueue); + const list = new DoublyLinkedList(); + const nodeMap = new Map>(); + for (const clientId of clientIdQueue) { + const range = list.push(clientId); + nodeMap.set(clientId, range.last); + } + this.taskQueues.set(taskId, list); + this.taskQueueNodes.set(taskId, nodeMap); } this.scrubClientsNotInQuorum(); } @@ -681,15 +700,16 @@ export class TaskManagerClass assertIsTaskManagerOperation(content); const pendingOps = this.latestPendingOps.get(content.taskId); assert(pendingOps !== undefined, 0xc42 /* No pending ops for task on resubmit attempt */); - const pendingOpIndex = pendingOps.findIndex( - (op) => op.messageId === localOpMetadata && op.type === content.type, + const pendingOpNode = pendingOps.find( + (node) => + node.data.messageId === localOpMetadata && node.data.type === content.type, ); - assert(pendingOpIndex !== -1, 0xc43 /* Could not match pending op on resubmit attempt */); - pendingOps.splice(pendingOpIndex, 1); - if ( - content.type === "volunteer" && - pendingOps[pendingOps.length - 1]?.type !== "abandon" - ) { + assert( + pendingOpNode !== undefined, + 0xc43 /* Could not match pending op on resubmit attempt */, + ); + pendingOps.remove(pendingOpNode); + if (content.type === "volunteer" && pendingOps.last?.data.type !== "abandon") { this.submitVolunteerOp(content.taskId); } if (pendingOps.length === 0) { @@ -763,21 +783,27 @@ export class TaskManagerClass ) { // Create the queue if it doesn't exist, and push the client on the back. let clientQueue = this.taskQueues.get(taskId); + let clientNodes = this.taskQueueNodes.get(taskId); if (clientQueue === undefined) { - clientQueue = []; + clientQueue = new DoublyLinkedList(); this.taskQueues.set(taskId, clientQueue); } + if (clientNodes === undefined) { + clientNodes = new Map>(); + this.taskQueueNodes.set(taskId, clientNodes); + } - if (clientQueue.includes(clientId)) { + if (clientNodes.has(clientId)) { // We shouldn't re-add the client if it's already in the queue. // This may be possible in scenarios where a client was added in // while detached. return; } - const oldLockHolder = clientQueue[0]; - clientQueue.push(clientId); - const newLockHolder = clientQueue[0]; + const oldLockHolder = clientQueue.first?.data; + const range = clientQueue.push(clientId); + clientNodes.set(clientId, range.last); + const newLockHolder = clientQueue.first?.data; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); } @@ -789,18 +815,21 @@ export class TaskManagerClass if (clientQueue === undefined) { return; } + const clientNodes = this.taskQueueNodes.get(taskId); const oldLockHolder = - clientId === placeholderClientId ? placeholderClientId : clientQueue[0]; - const clientIdIndex = clientQueue.indexOf(clientId); - if (clientIdIndex !== -1) { - clientQueue.splice(clientIdIndex, 1); + clientId === placeholderClientId ? placeholderClientId : clientQueue.first?.data; + const nodeToRemove = clientNodes?.get(clientId); + if (nodeToRemove !== undefined) { + clientQueue.remove(nodeToRemove); + clientNodes?.delete(clientId); // Clean up the queue if there are no more clients in it. if (clientQueue.length === 0) { this.taskQueues.delete(taskId); + this.taskQueueNodes.delete(taskId); } } - const newLockHolder = clientQueue[0]; + const newLockHolder = clientQueue.first?.data; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); } @@ -821,14 +850,25 @@ export class TaskManagerClass this.runtime.clientId !== undefined, 0x475 /* this.runtime.clientId should be defined */, ); - for (const clientQueue of this.taskQueues.values()) { - const clientIdIndex = clientQueue.indexOf(placeholderClientId); - if (clientIdIndex !== -1) { - if (clientQueue.includes(this.runtime.clientId)) { + const realClientId = this.runtime.clientId; + for (const [taskId, clientQueue] of this.taskQueues) { + const clientNodes = this.taskQueueNodes.get(taskId); + assert( + clientNodes !== undefined, + "taskQueueNodes side map missing entry for taskId", + ); + const placeholderNode = clientNodes.get(placeholderClientId); + if (placeholderNode !== undefined) { + if (clientNodes.has(realClientId)) { // If the real clientId is already in the queue, just remove the placeholder. - clientQueue.splice(clientIdIndex, 1); + clientQueue.remove(placeholderNode); + clientNodes.delete(placeholderClientId); } else { - clientQueue[clientIdIndex] = this.runtime.clientId; + // Insert the real clientId at the placeholder's position, then remove the placeholder. + const range = clientQueue.insertAfter(placeholderNode, realClientId); + clientNodes.set(realClientId, range.last); + clientQueue.remove(placeholderNode); + clientNodes.delete(placeholderClientId); } } } @@ -839,14 +879,28 @@ export class TaskManagerClass private scrubClientsNotInQuorum(): void { const quorum = this.runtime.getQuorum(); for (const [taskId, clientQueue] of this.taskQueues) { - const filteredClientQueue = clientQueue.filter( - (clientId) => quorum.getMember(clientId) !== undefined, + const clientNodes = this.taskQueueNodes.get(taskId); + assert( + clientNodes !== undefined, + "taskQueueNodes side map missing entry for taskId", ); - if (clientQueue.length !== filteredClientQueue.length) { - if (filteredClientQueue.length === 0) { + let removed = false; + // Walk by collecting removable nodes first to avoid mutating during iteration. + const toRemove: ListNode[] = []; + for (const node of clientQueue) { + if (quorum.getMember(node.data) === undefined) { + toRemove.push(node); + } + } + for (const node of toRemove) { + clientQueue.remove(node); + clientNodes.delete(node.data); + removed = true; + } + if (removed) { + if (clientQueue.length === 0) { this.taskQueues.delete(taskId); - } else { - this.taskQueues.set(taskId, filteredClientQueue); + this.taskQueueNodes.delete(taskId); } this.queueWatcher.emit("queueChange", taskId); } @@ -858,13 +912,10 @@ export class TaskManagerClass * for the latest pending ops. */ private queuedOptimistically(taskId: string): boolean { - const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + const inQueue = this.taskQueueNodes.get(taskId)?.has(this.clientId) ?? false; const latestPendingOps = this.latestPendingOps.get(taskId); - const latestPendingOp = - latestPendingOps !== undefined && latestPendingOps.length > 0 - ? latestPendingOps[latestPendingOps.length - 1] - : undefined; + const latestPendingOp = latestPendingOps?.last?.data; const isPendingVolunteer = latestPendingOp?.type === "volunteer"; const isPendingAbandonOrComplete = latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete"; @@ -904,7 +955,8 @@ export class TaskManagerClass const pendingOpToRollback = latestPendingOps.pop(); assert( // eslint-disable-next-line @typescript-eslint/prefer-optional-chain -- using ?. could change behavior - pendingOpToRollback !== undefined && pendingOpToRollback.messageId === localOpMetadata, + pendingOpToRollback !== undefined && + pendingOpToRollback.data.messageId === localOpMetadata, 0xc47 /* pending op mismatch */, ); if (latestPendingOps.length === 0) { From f1d966a15812b966ddff1c9aa773bd6a4164055c Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 16:24:41 -0700 Subject: [PATCH 2/4] chore(task-manager): biome format --- packages/dds/task-manager/src/taskManager.ts | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 72a9d1f6fcc0..9e0b6c5c091e 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -8,11 +8,7 @@ import { AttachState, type ReadOnlyInfo, } from "@fluidframework/container-definitions/internal"; -import { - DoublyLinkedList, - type ListNode, - assert, -} from "@fluidframework/core-utils/internal"; +import { DoublyLinkedList, type ListNode, assert } from "@fluidframework/core-utils/internal"; import type { IChannelAttributes, IFluidDataStoreRuntime, @@ -701,8 +697,7 @@ export class TaskManagerClass const pendingOps = this.latestPendingOps.get(content.taskId); assert(pendingOps !== undefined, 0xc42 /* No pending ops for task on resubmit attempt */); const pendingOpNode = pendingOps.find( - (node) => - node.data.messageId === localOpMetadata && node.data.type === content.type, + (node) => node.data.messageId === localOpMetadata && node.data.type === content.type, ); assert( pendingOpNode !== undefined, @@ -853,10 +848,7 @@ export class TaskManagerClass const realClientId = this.runtime.clientId; for (const [taskId, clientQueue] of this.taskQueues) { const clientNodes = this.taskQueueNodes.get(taskId); - assert( - clientNodes !== undefined, - "taskQueueNodes side map missing entry for taskId", - ); + assert(clientNodes !== undefined, "taskQueueNodes side map missing entry for taskId"); const placeholderNode = clientNodes.get(placeholderClientId); if (placeholderNode !== undefined) { if (clientNodes.has(realClientId)) { @@ -880,10 +872,7 @@ export class TaskManagerClass const quorum = this.runtime.getQuorum(); for (const [taskId, clientQueue] of this.taskQueues) { const clientNodes = this.taskQueueNodes.get(taskId); - assert( - clientNodes !== undefined, - "taskQueueNodes side map missing entry for taskId", - ); + assert(clientNodes !== undefined, "taskQueueNodes side map missing entry for taskId"); let removed = false; // Walk by collecting removable nodes first to avoid mutating during iteration. const toRemove: ListNode[] = []; From 8cc348d1c229090f0ca793b6e64baab67784c715 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 16:58:03 -0700 Subject: [PATCH 3/4] refactor(task-manager): encapsulate DoublyLinkedList+index in IndexedList wrapper --- packages/dds/task-manager/src/taskManager.ts | 252 ++++++++++++------- 1 file changed, 166 insertions(+), 86 deletions(-) diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 9e0b6c5c091e..e7be65a0d985 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -70,6 +70,126 @@ function assertIsTaskManagerOperation(op: unknown): asserts op is ITaskManagerOp ); } +/** + * Encapsulates a {@link DoublyLinkedList} together with a side-index `Map>` so that + * mutations to the list and the index cannot drift out of sync. The two structures were previously + * stored independently on {@link TaskManagerClass} and had to be kept byte-symmetric at every + * mutation site — an invariant only enforced by runtime asserts. By bundling them behind this + * private wrapper the invariant becomes unrepresentable. + * + * The wrapper exposes just the operations the call sites in this file actually need; iteration of + * nodes is delegated straight through to the inner list. + * + * @param keyOf - Extracts the index key from a value. Used by {@link IndexedList.shift} and + * {@link IndexedList.pop} to keep the index consistent with the list when a node is removed by + * position rather than by key. + */ +class IndexedList implements Iterable> { + private readonly list = new DoublyLinkedList(); + private readonly index = new Map>(); + + public constructor(private readonly keyOf: (value: V) => K) {} + + public get length(): number { + return this.list.length; + } + + public get first(): ListNode | undefined { + return this.list.first; + } + + public get last(): ListNode | undefined { + return this.list.last; + } + + public has(key: K): boolean { + return this.index.has(key); + } + + public getNode(key: K): ListNode | undefined { + return this.index.get(key); + } + + /** + * Appends a value to the end of the list and indexes it under `key`. + * @returns The newly inserted node. + */ + public push(key: K, value: V): ListNode { + const { first } = this.list.push(value); + this.index.set(key, first); + return first; + } + + /** + * Inserts `value` immediately after `after` and indexes it under `key`. + * @returns The newly inserted node. + */ + public insertAfter(after: ListNode, key: K, value: V): ListNode { + const { first } = this.list.insertAfter(after, value); + this.index.set(key, first); + return first; + } + + /** + * Removes the entry at the given key from both the list and the index. + * @returns True if an entry was removed; false if the key was not present. + */ + public deleteByKey(key: K): boolean { + const node = this.index.get(key); + if (node === undefined) { + return false; + } + this.list.remove(node); + this.index.delete(key); + return true; + } + + /** + * Removes the given node from both the list and the index. Caller must have obtained `node` + * from this list (e.g. via iteration or {@link IndexedList.getNode}). + */ + public removeNode(node: ListNode): void { + this.list.remove(node); + this.index.delete(this.keyOf(node.data)); + } + + /** + * Removes and returns the first node, also removing its index entry. + */ + public shift(): ListNode | undefined { + const node = this.list.shift(); + if (node !== undefined) { + this.index.delete(this.keyOf(node.data)); + } + return node; + } + + /** + * Removes and returns the last node, also removing its index entry. + */ + public pop(): ListNode | undefined { + const node = this.list.pop(); + if (node !== undefined) { + this.index.delete(this.keyOf(node.data)); + } + return node; + } + + /** + * Iterates the values' nodes in list order. Delegates straight through to the inner list. + */ + public [Symbol.iterator](): IterableIterator> { + return this.list[Symbol.iterator](); + } + + /** + * Maps each node to a new value, in list order. Delegates straight through to the inner list. + */ + public map(callbackfn: (value: ListNode) => U): Iterable { + return this.list.map(callbackfn); + } +} + const snapshotFileName = "header"; /** @@ -88,17 +208,13 @@ export class TaskManagerClass implements ITaskManager { /** - * Mapping of taskId to a queue of clientIds that are waiting on the task. Maintains the consensus state of the + * Mapping of taskId to a queue of clientIds that are waiting on the task. Maintains the consensus state of the * queue, even if we know we've submitted an op that should eventually modify the queue. + * + * The {@link IndexedList} wrapper bundles the queue with a clientId -\> ListNode index, enabling O(1) lookup + * and removal of a client without scanning the list while keeping the two structures inherently in sync. */ - private readonly taskQueues = new Map>(); - - /** - * Side-lookup map paralleling {@link TaskManagerClass.taskQueues}: for each taskId, maps clientId to the - * {@link ListNode} that holds that clientId in the corresponding queue. Enables O(1) lookup and removal of a - * client from a queue without scanning the list. Must be kept symmetric with `taskQueues` at every mutation. - */ - private readonly taskQueueNodes = new Map>>(); + private readonly taskQueues = new Map>(); // opWatcher emits for every op on this data store. This is just a repackaging of processMessagesCore into events. private readonly opWatcher: EventEmitter = new EventEmitter(); @@ -117,9 +233,11 @@ export class TaskManagerClass private nextPendingMessageId: number = 0; /** - * Tracks the most recent pending op for a given task + * Tracks the most recent pending op for a given task. The {@link IndexedList} wrapper indexes the + * queue of pending ops by their messageId, which simplifies {@link TaskManagerClass.reSubmitCore}'s + * lookup of a specific pending op from O(n) `find` to O(1) `getNode`. */ - private readonly latestPendingOps = new Map>(); + private readonly latestPendingOps = new Map>(); /** * Tracks tasks that are this client is currently subscribed to. @@ -215,7 +333,6 @@ export class TaskManagerClass } this.taskQueues.delete(taskId); - this.taskQueueNodes.delete(taskId); this.completedWatcher.emit("completed", taskId, messageId); this.emit("completed", taskId); }, @@ -266,12 +383,7 @@ export class TaskManagerClass messageId: this.nextPendingMessageId++, }; this.submitLocalMessage(op, pendingOp.messageId); - const latestPendingOps = this.latestPendingOps.get(taskId); - if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, new DoublyLinkedList([pendingOp])); - } else { - latestPendingOps.push(pendingOp); - } + this.appendPendingOp(taskId, pendingOp); } private submitAbandonOp(taskId: string): void { @@ -284,12 +396,7 @@ export class TaskManagerClass messageId: this.nextPendingMessageId++, }; this.submitLocalMessage(op, pendingOp.messageId); - const latestPendingOps = this.latestPendingOps.get(taskId); - if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, new DoublyLinkedList([pendingOp])); - } else { - latestPendingOps.push(pendingOp); - } + this.appendPendingOp(taskId, pendingOp); } private submitCompleteOp(taskId: string): void { @@ -303,12 +410,16 @@ export class TaskManagerClass }; this.submitLocalMessage(op, pendingOp.messageId); - const latestPendingOps = this.latestPendingOps.get(taskId); + this.appendPendingOp(taskId, pendingOp); + } + + private appendPendingOp(taskId: string, pendingOp: IPendingOp): void { + let latestPendingOps = this.latestPendingOps.get(taskId); if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, new DoublyLinkedList([pendingOp])); - } else { - latestPendingOps.push(pendingOp); + latestPendingOps = new IndexedList((op) => op.messageId); + this.latestPendingOps.set(taskId, latestPendingOps); } + latestPendingOps.push(pendingOp.messageId, pendingOp); } /** @@ -580,7 +691,7 @@ export class TaskManagerClass return false; } - return this.taskQueueNodes.get(taskId)?.has(this.clientId) ?? false; + return this.taskQueues.get(taskId)?.has(this.clientId) ?? false; } /** @@ -602,7 +713,6 @@ export class TaskManagerClass // we are attached. Additionally, we don't need to check if we are connected while detached. if (this.isDetached()) { this.taskQueues.delete(taskId); - this.taskQueueNodes.delete(taskId); this.completedWatcher.emit("completed", taskId); this.emit("completed", taskId); return; @@ -658,14 +768,11 @@ export class TaskManagerClass protected async loadCore(storage: IChannelStorageService): Promise { const content = await readAndParse<[string, string[]][]>(storage, snapshotFileName); for (const [taskId, clientIdQueue] of content) { - const list = new DoublyLinkedList(); - const nodeMap = new Map>(); + const list = new IndexedList((clientId) => clientId); for (const clientId of clientIdQueue) { - const range = list.push(clientId); - nodeMap.set(clientId, range.last); + list.push(clientId, clientId); } this.taskQueues.set(taskId, list); - this.taskQueueNodes.set(taskId, nodeMap); } this.scrubClientsNotInQuorum(); } @@ -696,14 +803,12 @@ export class TaskManagerClass assertIsTaskManagerOperation(content); const pendingOps = this.latestPendingOps.get(content.taskId); assert(pendingOps !== undefined, 0xc42 /* No pending ops for task on resubmit attempt */); - const pendingOpNode = pendingOps.find( - (node) => node.data.messageId === localOpMetadata && node.data.type === content.type, - ); + const pendingOpNode = pendingOps.getNode(localOpMetadata); assert( - pendingOpNode !== undefined, + pendingOpNode?.data.type === content.type, 0xc43 /* Could not match pending op on resubmit attempt */, ); - pendingOps.remove(pendingOpNode); + pendingOps.removeNode(pendingOpNode); if (content.type === "volunteer" && pendingOps.last?.data.type !== "abandon") { this.submitVolunteerOp(content.taskId); } @@ -778,17 +883,12 @@ export class TaskManagerClass ) { // Create the queue if it doesn't exist, and push the client on the back. let clientQueue = this.taskQueues.get(taskId); - let clientNodes = this.taskQueueNodes.get(taskId); if (clientQueue === undefined) { - clientQueue = new DoublyLinkedList(); + clientQueue = new IndexedList((cid) => cid); this.taskQueues.set(taskId, clientQueue); } - if (clientNodes === undefined) { - clientNodes = new Map>(); - this.taskQueueNodes.set(taskId, clientNodes); - } - if (clientNodes.has(clientId)) { + if (clientQueue.has(clientId)) { // We shouldn't re-add the client if it's already in the queue. // This may be possible in scenarios where a client was added in // while detached. @@ -796,8 +896,7 @@ export class TaskManagerClass } const oldLockHolder = clientQueue.first?.data; - const range = clientQueue.push(clientId); - clientNodes.set(clientId, range.last); + clientQueue.push(clientId, clientId); const newLockHolder = clientQueue.first?.data; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); @@ -810,19 +909,12 @@ export class TaskManagerClass if (clientQueue === undefined) { return; } - const clientNodes = this.taskQueueNodes.get(taskId); const oldLockHolder = clientId === placeholderClientId ? placeholderClientId : clientQueue.first?.data; - const nodeToRemove = clientNodes?.get(clientId); - if (nodeToRemove !== undefined) { - clientQueue.remove(nodeToRemove); - clientNodes?.delete(clientId); - // Clean up the queue if there are no more clients in it. - if (clientQueue.length === 0) { - this.taskQueues.delete(taskId); - this.taskQueueNodes.delete(taskId); - } + // Clean up the queue if the removal leaves it empty. + if (clientQueue.deleteByKey(clientId) && clientQueue.length === 0) { + this.taskQueues.delete(taskId); } const newLockHolder = clientQueue.first?.data; if (newLockHolder !== oldLockHolder) { @@ -846,22 +938,15 @@ export class TaskManagerClass 0x475 /* this.runtime.clientId should be defined */, ); const realClientId = this.runtime.clientId; - for (const [taskId, clientQueue] of this.taskQueues) { - const clientNodes = this.taskQueueNodes.get(taskId); - assert(clientNodes !== undefined, "taskQueueNodes side map missing entry for taskId"); - const placeholderNode = clientNodes.get(placeholderClientId); + for (const clientQueue of this.taskQueues.values()) { + const placeholderNode = clientQueue.getNode(placeholderClientId); if (placeholderNode !== undefined) { - if (clientNodes.has(realClientId)) { - // If the real clientId is already in the queue, just remove the placeholder. - clientQueue.remove(placeholderNode); - clientNodes.delete(placeholderClientId); - } else { - // Insert the real clientId at the placeholder's position, then remove the placeholder. - const range = clientQueue.insertAfter(placeholderNode, realClientId); - clientNodes.set(realClientId, range.last); - clientQueue.remove(placeholderNode); - clientNodes.delete(placeholderClientId); + if (!clientQueue.has(realClientId)) { + // Insert the real clientId at the placeholder's position before we remove the placeholder. + clientQueue.insertAfter(placeholderNode, realClientId, realClientId); } + // Remove the placeholder; if the real clientId was already present we just drop the placeholder. + clientQueue.deleteByKey(placeholderClientId); } } } @@ -871,9 +956,6 @@ export class TaskManagerClass private scrubClientsNotInQuorum(): void { const quorum = this.runtime.getQuorum(); for (const [taskId, clientQueue] of this.taskQueues) { - const clientNodes = this.taskQueueNodes.get(taskId); - assert(clientNodes !== undefined, "taskQueueNodes side map missing entry for taskId"); - let removed = false; // Walk by collecting removable nodes first to avoid mutating during iteration. const toRemove: ListNode[] = []; for (const node of clientQueue) { @@ -881,18 +963,16 @@ export class TaskManagerClass toRemove.push(node); } } + if (toRemove.length === 0) { + continue; + } for (const node of toRemove) { - clientQueue.remove(node); - clientNodes.delete(node.data); - removed = true; + clientQueue.removeNode(node); } - if (removed) { - if (clientQueue.length === 0) { - this.taskQueues.delete(taskId); - this.taskQueueNodes.delete(taskId); - } - this.queueWatcher.emit("queueChange", taskId); + if (clientQueue.length === 0) { + this.taskQueues.delete(taskId); } + this.queueWatcher.emit("queueChange", taskId); } } @@ -901,7 +981,7 @@ export class TaskManagerClass * for the latest pending ops. */ private queuedOptimistically(taskId: string): boolean { - const inQueue = this.taskQueueNodes.get(taskId)?.has(this.clientId) ?? false; + const inQueue = this.taskQueues.get(taskId)?.has(this.clientId) ?? false; const latestPendingOps = this.latestPendingOps.get(taskId); const latestPendingOp = latestPendingOps?.last?.data; From 2fb6c2d3a98d6af36fe687f6ca8c6cfa3f62443d Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 18:35:37 -0700 Subject: [PATCH 4/4] test(task-manager): update helpers for IndexedList + ordering regressions --- .../dds/task-manager/src/test/fuzzUtils.ts | 67 +++- .../task-manager/src/test/taskManager.spec.ts | 350 +++++++++++++++++- 2 files changed, 389 insertions(+), 28 deletions(-) diff --git a/packages/dds/task-manager/src/test/fuzzUtils.ts b/packages/dds/task-manager/src/test/fuzzUtils.ts index b43a9e87dbfd..f9aa73d7c59c 100644 --- a/packages/dds/task-manager/src/test/fuzzUtils.ts +++ b/packages/dds/task-manager/src/test/fuzzUtils.ts @@ -163,6 +163,40 @@ interface LoggingInfo { taskId: string; } +/** + * Internal shape exposed only for test introspection. + * + * Mirrors the {@link IndexedList} wrapper added to `TaskManagerClass` so these helpers + * can traverse the queue without relying on the structural details of the wrapper. + */ +interface TaskQueueLike { + readonly length: number; + readonly first: { readonly data: string } | undefined; + [Symbol.iterator](): IterableIterator<{ readonly data: string }>; +} + +/** + * Reads the private `taskQueues` map off a {@link ITaskManager} for test introspection, + * and returns the queue for `taskId` as an array of clientIds (in queue order). + * + * Centralizes the `as any` cast and shields the call sites from changes to the underlying + * data structure. + */ +function readTaskQueue(taskManager: ITaskManager, taskId: string): string[] | undefined { + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any */ + const queues: Map = (taskManager as any).taskQueues; + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any */ + const queue = queues.get(taskId); + if (queue === undefined) { + return undefined; + } + const clientIds: string[] = []; + for (const node of queue) { + clientIds.push(node.data); + } + return clientIds; +} + function logCurrentState(state: FuzzTestState, loggingInfo: LoggingInfo): void { for (const client of state.clients) { const taskManager = client.channel; @@ -171,8 +205,7 @@ function logCurrentState(state: FuzzTestState, loggingInfo: LoggingInfo): void { console.log( `TaskManager ${taskManager.id} (CanVolunteer: ${taskManager.canVolunteer()}):`, ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any - console.log((taskManager as any).taskQueues.get(loggingInfo.taskId)); + console.log(readTaskQueue(taskManager, loggingInfo.taskId)); console.log("\n"); } } @@ -235,24 +268,26 @@ function makeReducer(loggingInfo?: LoggingInfo): Reducer = (a as any).taskQueues; - const queue2: Map = (b as any).taskQueues; + const queues1: Map = (a as any).taskQueues; + const queues2: Map = (b as any).taskQueues; /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any */ - assert.strictEqual(queue1.size, queue2.size, "The number of tasks queues are not the same"); - for (const [key, val] of queue1) { - const testVal = queue2.get(key); - if (testVal === undefined) { - assert(val === undefined, "Task queues are not both undefined"); + assert.strictEqual( + queues1.size, + queues2.size, + "The number of tasks queues are not the same", + ); + for (const key of queues1.keys()) { + const aQueue = readTaskQueue(a, key); + const bQueue = readTaskQueue(b, key); + if (bQueue === undefined) { + assert(aQueue === undefined, "Task queues are not both undefined"); continue; } - assert.strictEqual(testVal.length, val.length, "Task queues are not the same size"); - if (testVal.length > 0) { - const testValArr = testVal; - const valArr = val; - for (const [index, task] of testValArr.entries()) { - assert.strictEqual(task, valArr[index], `Task queues are not identical`); - } + assert(aQueue !== undefined, "Task queues are not both defined"); + assert.strictEqual(bQueue.length, aQueue.length, "Task queues are not the same size"); + for (const [index, task] of bQueue.entries()) { + assert.strictEqual(task, aQueue[index], `Task queues are not identical`); } } } diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index 1564a28e58eb..2361a554a2f8 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -12,6 +12,7 @@ import { MockContainerRuntimeFactory, MockContainerRuntimeFactoryForReconnection, MockFluidDataStoreRuntime, + MockSharedObjectServices, MockStorage, } from "@fluidframework/test-runtime-utils/internal"; @@ -19,6 +20,53 @@ import type { ITaskManager } from "../interfaces.js"; import { TaskManagerClass } from "../taskManager.js"; import { TaskManagerFactory } from "../taskManagerFactory.js"; +/** + * Internal shape exposed only for test introspection. + * + * Mirrors the `IndexedList` wrapper that `TaskManagerClass` uses for `taskQueues` so these + * helpers can traverse the queue without depending on the structural details of the wrapper. + */ +interface TaskQueueLike { + readonly length: number; + readonly first: { readonly data: string } | undefined; + [Symbol.iterator](): IterableIterator<{ readonly data: string }>; +} + +/** + * Reads the private `taskQueues` map off a {@link TaskManagerClass} for test introspection. + */ +function getInternalTaskQueues(taskManager: TaskManagerClass): Map { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-return + return (taskManager as any).taskQueues; +} + +/** + * Returns the queue for `taskId` as an array of clientIds in queue order, or `undefined` if + * no queue exists for that task. + */ +function getTaskQueueAsArray( + taskManager: TaskManagerClass, + taskId: string, +): string[] | undefined { + const queue = getInternalTaskQueues(taskManager).get(taskId); + if (queue === undefined) { + return undefined; + } + const clientIds: string[] = []; + for (const node of queue) { + clientIds.push(node.data); + } + return clientIds; +} + +/** + * Returns the clientId at the head of the queue (the current lock holder candidate) for + * `taskId`, or `undefined` if no queue exists. + */ +function getTaskQueueHead(taskManager: TaskManagerClass, taskId: string): string | undefined { + return getInternalTaskQueues(taskManager).get(taskId)?.first?.data; +} + function createConnectedTaskManager( id: string, runtimeFactory: MockContainerRuntimeFactory, @@ -756,8 +804,7 @@ describe("TaskManager", () => { assert.ok(taskManager1.queued(taskId), "Should be queued"); assert.ok(taskManager1.assigned(taskId), "Should be assigned"); assert.strictEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId)?.[0], + getTaskQueueHead(taskManager1, taskId), placeholderClientId, "taskQueue should have placeholder clientId", ); @@ -776,8 +823,10 @@ describe("TaskManager", () => { assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); assert.ok(taskManager1EventFired, "Should have raised lost event on taskManager1"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any - assert.ok((taskManager1 as any).taskQueues.size === 0, "taskQueue should be empty"); + assert.ok( + getInternalTaskQueues(taskManager1).size === 0, + "taskQueue should be empty", + ); }); }); @@ -819,13 +868,11 @@ describe("TaskManager", () => { assert.ok(taskManager1.subscribed(taskId), "Task manager 1 should be subscribed"); assert.ok( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId)?.length !== 0, + (getInternalTaskQueues(taskManager1).get(taskId)?.length ?? 0) !== 0, "taskQueue should not be empty", ); assert.notStrictEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId)?.[0], + getTaskQueueHead(taskManager1, taskId), placeholderClientId, "taskQueue should not have placeholder clientId", ); @@ -982,8 +1029,7 @@ describe("TaskManager", () => { const clientId2 = containerRuntime2.clientId; assert.deepEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId), + getTaskQueueAsArray(taskManager1, taskId), [clientId1, clientId2], "Task queue should have both clients", ); @@ -992,8 +1038,7 @@ describe("TaskManager", () => { containerRuntimeFactory.processAllMessages(); assert.deepEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId), + getTaskQueueAsArray(taskManager1, taskId), [clientId1], "Task queue should only have client 1", ); @@ -1233,4 +1278,285 @@ describe("TaskManager", () => { describe("Completing tasks", () => {}); }); }); + + describe("Queue ordering regressions", () => { + // These tests exercise invariants around queue ordering that were previously + // guaranteed by the side-by-side `taskQueues` + `taskQueueIndex` structures and + // are now guaranteed by the `IndexedList` wrapper. + + describe("scrubClientsNotInQuorum", () => { + it("removes only the non-quorum clients and preserves the relative order of survivors", async () => { + // Wire up a queue with multiple clients via real volunteer ops so we don't + // have to manufacture an `IndexedList` from outside the module. + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager1 = createConnectedTaskManager("tm1", containerRuntimeFactory); + const taskManager2 = createConnectedTaskManager("tm2", containerRuntimeFactory); + const taskManager3 = createConnectedTaskManager("tm3", containerRuntimeFactory); + const taskManager4 = createConnectedTaskManager("tm4", containerRuntimeFactory); + + const taskId = "taskId"; + const p1 = taskManager1.volunteerForTask(taskId); + // Subscribe the others (rather than awaiting their volunteer promises) so + // they join the queue without leaving outstanding promises that never + // settle (only the head of the queue resolves). + taskManager2.subscribeToTask(taskId); + taskManager3.subscribeToTask(taskId); + taskManager4.subscribeToTask(taskId); + containerRuntimeFactory.processAllMessages(); + await p1; + + // Capture the established queue order so the assertion is independent of + // the (random) clientId values. + const initialOrder = getTaskQueueAsArray(taskManager1, taskId); + assert.ok(initialOrder?.length === 4); + const c1 = initialOrder[0] as string; + const c2 = initialOrder[1] as string; + const c3 = initialOrder[2] as string; + const c4 = initialOrder[3] as string; + + // Remove clients 2 and 4 from the quorum's underlying members map + // *without* emitting the `removeMember` event — that event is what would + // normally trigger removeClientFromAllQueues. By suppressing it, we leave + // stale entries in the queue and force scrubClientsNotInQuorum to do the + // work we want to test. + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const quorumMembers: Map = (taskManager1 as any).runtime.getQuorum() + .members; + quorumMembers.delete(c2); + quorumMembers.delete(c4); + (taskManager1 as any).scrubClientsNotInQuorum(); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + + assert.deepEqual( + getTaskQueueAsArray(taskManager1, taskId), + [c1, c3], + "Survivors must remain in their original relative order", + ); + }); + }); + + describe("Placeholder → real clientId substitution", () => { + it("keeps the head of the queue stable when the placeholder is the lock holder", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const { taskManager: taskManagerDetached } = createDetachedTaskManager( + "detached", + containerRuntimeFactory, + ); + + // Force runtime.clientId to undefined so the detached volunteer path inserts + // the placeholder clientId rather than the auto-generated one. We then + // manually splice other clients in behind the placeholder via the private + // `addClientToQueue` so we can verify the placeholder→real swap preserves + // the placeholder's position rather than reinserting at the tail. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (taskManagerDetached as any).runtime.clientId = undefined; + + const taskId = "taskId"; + const volunteerP = taskManagerDetached.volunteerForTask(taskId); + containerRuntimeFactory.processAllMessages(); + await volunteerP; + + // Inject fake quorum members so addClientToQueue accepts them. + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const quorum: { addMember: (id: string, c: unknown) => void } = ( + taskManagerDetached as any + ).runtime.getQuorum(); + quorum.addMember("other-1", {}); + quorum.addMember("other-2", {}); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManagerDetached as any).addClientToQueue(taskId, "other-1"); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManagerDetached as any).addClientToQueue(taskId, "other-2"); + + assert.deepEqual( + getTaskQueueAsArray(taskManagerDetached, taskId), + ["placeholder", "other-1", "other-2"], + "Pre-condition: placeholder is at the head", + ); + + // Trigger the substitution path. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (taskManagerDetached as any).runtime.clientId = "real-client"; + quorum.addMember("real-client", {}); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManagerDetached as any).replacePlaceholderInAllQueues(); + + assert.deepEqual( + getTaskQueueAsArray(taskManagerDetached, taskId), + ["real-client", "other-1", "other-2"], + "Real clientId must take the placeholder's slot, not append at the tail", + ); + }); + }); + + describe("reSubmitCore", () => { + it("removes the matching pending op without disturbing siblings", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager = createConnectedTaskManager("tm", containerRuntimeFactory); + const taskId = "taskId"; + + // Submit three volunteer ops back-to-back without processing — this drives + // `latestPendingOps[taskId]` to hold three entries, each with a distinct + // messageId. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + + type PendingOpList = Iterable<{ data: { type: string; messageId: number } }>; + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const pendingOps: PendingOpList = (taskManager as any).latestPendingOps.get(taskId); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + assert.ok(pendingOps !== undefined); + const messageIds: number[] = []; + for (const node of pendingOps) { + messageIds.push(node.data.messageId); + } + assert.strictEqual(messageIds.length, 3); + const firstId = messageIds[0] as number; + const middleId = messageIds[1] as number; + const lastId = messageIds[2] as number; + + // Resubmit only the middle op — the matching op should be removed and the + // surviving siblings should remain in their original relative order. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).reSubmitCore({ type: "volunteer", taskId }, middleId); + + const surviving: number[] = []; + for (const node of pendingOps) { + surviving.push(node.data.messageId); + } + // reSubmitCore re-submits a fresh volunteer op when the last pending is not + // an abandon, which appends a new entry at the tail with a messageId greater + // than `lastId`. The leading entries should be the original first/last in + // their original relative order. + assert.strictEqual(surviving.length, 3); + assert.strictEqual(surviving[0], firstId, "First sibling must keep its slot"); + assert.strictEqual( + surviving[1], + lastId, + "Last sibling must keep its slot (relative to surviving siblings)", + ); + assert.ok( + (surviving[2] as number) > lastId, + "Resubmitted volunteer op should be appended at the tail", + ); + }); + }); + + describe("rollback", () => { + it("rolls back the latest pending op (LIFO)", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager = createConnectedTaskManager("tm", containerRuntimeFactory); + const taskId = "taskId"; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitAbandonOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + + type PendingOpList = Iterable<{ data: { type: string; messageId: number } }>; + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const pendingOps: PendingOpList = (taskManager as any).latestPendingOps.get(taskId); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + assert.ok(pendingOps !== undefined); + const messageIds: number[] = []; + const types: string[] = []; + for (const node of pendingOps) { + messageIds.push(node.data.messageId); + types.push(node.data.type); + } + assert.deepEqual(types, ["volunteer", "abandon", "volunteer"]); + const lastId = messageIds[2]; + + // Rolling back the last submitted op should pop only the tail entry and + // leave the head/middle untouched. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).rollback({ type: "volunteer", taskId }, lastId); + + const remainingTypes: string[] = []; + for (const node of pendingOps) { + remainingTypes.push(node.data.type); + } + assert.deepEqual( + remainingTypes, + ["volunteer", "abandon"], + "Rollback must pop only the tail (LIFO)", + ); + + // Rolling back the abandon (now the tail) should similarly only remove it. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).rollback({ type: "abandon", taskId }, messageIds[1]); + + const finalTypes: string[] = []; + for (const node of pendingOps) { + finalTypes.push(node.data.type); + } + assert.deepEqual(finalTypes, ["volunteer"]); + }); + }); + + describe("summarize → loadCore round-trip", () => { + it("preserves queue order across summary/load", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager1 = createConnectedTaskManager("tm1", containerRuntimeFactory); + const taskManager2 = createConnectedTaskManager("tm2", containerRuntimeFactory); + const taskManager3 = createConnectedTaskManager("tm3", containerRuntimeFactory); + const taskId1 = "taskA"; + const taskId2 = "taskB"; + + // Use subscribe for the followers so we don't leave outstanding volunteer + // promises that never settle (only the head of each queue resolves). + const p1a = taskManager1.volunteerForTask(taskId1); + taskManager2.subscribeToTask(taskId1); + taskManager3.subscribeToTask(taskId1); + const p3b = taskManager3.volunteerForTask(taskId2); + taskManager1.subscribeToTask(taskId2); + containerRuntimeFactory.processAllMessages(); + await Promise.all([p1a, p3b]); + + const expectedA = getTaskQueueAsArray(taskManager1, taskId1); + const expectedB = getTaskQueueAsArray(taskManager1, taskId2); + assert.ok(expectedA?.length === 3); + assert.ok(expectedB?.length === 2); + + // Round-trip the summary into a fresh TaskManager and verify the queues + // come back in the same order. + const summaryResult = await taskManager1.summarize(); + const services = MockSharedObjectServices.createFromSummary(summaryResult.summary); + + // We need all the original clientIds present in the new runtime's quorum + // so scrubClientsNotInQuorum (called from loadCore) doesn't drop them. + const reloadedRuntime = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(reloadedRuntime); + for (const clientId of [...expectedA, ...expectedB]) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (reloadedRuntime as any).quorum.addMember(clientId, {}); + } + + const reloaded = new TaskManagerClass( + "tm-reloaded", + reloadedRuntime, + TaskManagerFactory.Attributes, + ); + await reloaded.load(services); + + assert.deepEqual( + getTaskQueueAsArray(reloaded, taskId1), + expectedA, + "taskA queue ordering must survive summary/load", + ); + assert.deepEqual( + getTaskQueueAsArray(reloaded, taskId2), + expectedB, + "taskB queue ordering must survive summary/load", + ); + }); + }); + }); });