diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 0a9f3d9ca11a..e7be65a0d985 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -8,7 +8,7 @@ import { AttachState, type ReadOnlyInfo, } from "@fluidframework/container-definitions/internal"; -import { assert } from "@fluidframework/core-utils/internal"; +import { DoublyLinkedList, type ListNode, assert } from "@fluidframework/core-utils/internal"; import type { IChannelAttributes, IFluidDataStoreRuntime, @@ -70,6 +70,126 @@ function assertIsTaskManagerOperation(op: unknown): asserts op is ITaskManagerOp ); } +/** + * Encapsulates a {@link DoublyLinkedList} together with a side-index `Map>` so that + * mutations to the list and the index cannot drift out of sync. The two structures were previously + * stored independently on {@link TaskManagerClass} and had to be kept byte-symmetric at every + * mutation site — an invariant only enforced by runtime asserts. By bundling them behind this + * private wrapper the invariant becomes unrepresentable. + * + * The wrapper exposes just the operations the call sites in this file actually need; iteration of + * nodes is delegated straight through to the inner list. + * + * @param keyOf - Extracts the index key from a value. Used by {@link IndexedList.shift} and + * {@link IndexedList.pop} to keep the index consistent with the list when a node is removed by + * position rather than by key. + */ +class IndexedList implements Iterable> { + private readonly list = new DoublyLinkedList(); + private readonly index = new Map>(); + + public constructor(private readonly keyOf: (value: V) => K) {} + + public get length(): number { + return this.list.length; + } + + public get first(): ListNode | undefined { + return this.list.first; + } + + public get last(): ListNode | undefined { + return this.list.last; + } + + public has(key: K): boolean { + return this.index.has(key); + } + + public getNode(key: K): ListNode | undefined { + return this.index.get(key); + } + + /** + * Appends a value to the end of the list and indexes it under `key`. + * @returns The newly inserted node. + */ + public push(key: K, value: V): ListNode { + const { first } = this.list.push(value); + this.index.set(key, first); + return first; + } + + /** + * Inserts `value` immediately after `after` and indexes it under `key`. + * @returns The newly inserted node. + */ + public insertAfter(after: ListNode, key: K, value: V): ListNode { + const { first } = this.list.insertAfter(after, value); + this.index.set(key, first); + return first; + } + + /** + * Removes the entry at the given key from both the list and the index. + * @returns True if an entry was removed; false if the key was not present. + */ + public deleteByKey(key: K): boolean { + const node = this.index.get(key); + if (node === undefined) { + return false; + } + this.list.remove(node); + this.index.delete(key); + return true; + } + + /** + * Removes the given node from both the list and the index. Caller must have obtained `node` + * from this list (e.g. via iteration or {@link IndexedList.getNode}). + */ + public removeNode(node: ListNode): void { + this.list.remove(node); + this.index.delete(this.keyOf(node.data)); + } + + /** + * Removes and returns the first node, also removing its index entry. + */ + public shift(): ListNode | undefined { + const node = this.list.shift(); + if (node !== undefined) { + this.index.delete(this.keyOf(node.data)); + } + return node; + } + + /** + * Removes and returns the last node, also removing its index entry. + */ + public pop(): ListNode | undefined { + const node = this.list.pop(); + if (node !== undefined) { + this.index.delete(this.keyOf(node.data)); + } + return node; + } + + /** + * Iterates the values' nodes in list order. Delegates straight through to the inner list. + */ + public [Symbol.iterator](): IterableIterator> { + return this.list[Symbol.iterator](); + } + + /** + * Maps each node to a new value, in list order. Delegates straight through to the inner list. + */ + public map(callbackfn: (value: ListNode) => U): Iterable { + return this.list.map(callbackfn); + } +} + const snapshotFileName = "header"; /** @@ -88,10 +208,13 @@ export class TaskManagerClass implements ITaskManager { /** - * Mapping of taskId to a queue of clientIds that are waiting on the task. Maintains the consensus state of the + * Mapping of taskId to a queue of clientIds that are waiting on the task. Maintains the consensus state of the * queue, even if we know we've submitted an op that should eventually modify the queue. + * + * The {@link IndexedList} wrapper bundles the queue with a clientId -\> ListNode index, enabling O(1) lookup + * and removal of a client without scanning the list while keeping the two structures inherently in sync. */ - private readonly taskQueues = new Map(); + private readonly taskQueues = new Map>(); // opWatcher emits for every op on this data store. This is just a repackaging of processMessagesCore into events. private readonly opWatcher: EventEmitter = new EventEmitter(); @@ -110,9 +233,11 @@ export class TaskManagerClass private nextPendingMessageId: number = 0; /** - * Tracks the most recent pending op for a given task + * Tracks the most recent pending op for a given task. The {@link IndexedList} wrapper indexes the + * queue of pending ops by their messageId, which simplifies {@link TaskManagerClass.reSubmitCore}'s + * lookup of a specific pending op from O(n) `find` to O(1) `getNode`. */ - private readonly latestPendingOps = new Map(); + private readonly latestPendingOps = new Map>(); /** * Tracks tasks that are this client is currently subscribed to. @@ -153,12 +278,12 @@ export class TaskManagerClass if (local) { const latestPendingOps = this.latestPendingOps.get(taskId); assert(latestPendingOps !== undefined, 0xc3c /* No pending ops for task */); - const pendingOp = latestPendingOps.shift(); + const pendingOpNode = latestPendingOps.shift(); assert( - pendingOp !== undefined && pendingOp.messageId === messageId, + pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId, 0xc3d /* Unexpected op */, ); - assert(pendingOp.type === "volunteer", 0x07c /* "Unexpected op type" */); + assert(pendingOpNode.data.type === "volunteer", 0x07c /* "Unexpected op type" */); if (latestPendingOps.length === 0) { this.latestPendingOps.delete(taskId); } @@ -174,12 +299,12 @@ export class TaskManagerClass if (local) { const latestPendingOps = this.latestPendingOps.get(taskId); assert(latestPendingOps !== undefined, 0xc3e /* No pending ops for task */); - const pendingOp = latestPendingOps.shift(); + const pendingOpNode = latestPendingOps.shift(); assert( - pendingOp !== undefined && pendingOp.messageId === messageId, + pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId, 0xc3f /* Unexpected op */, ); - assert(pendingOp.type === "abandon", 0x07e /* "Unexpected op type" */); + assert(pendingOpNode.data.type === "abandon", 0x07e /* "Unexpected op type" */); if (latestPendingOps.length === 0) { this.latestPendingOps.delete(taskId); } @@ -196,12 +321,12 @@ export class TaskManagerClass if (local) { const latestPendingOps = this.latestPendingOps.get(taskId); assert(latestPendingOps !== undefined, 0xc40 /* No pending ops for task */); - const pendingOp = latestPendingOps.shift(); + const pendingOpNode = latestPendingOps.shift(); assert( - pendingOp !== undefined && pendingOp.messageId === messageId, + pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId, 0xc41 /* Unexpected op */, ); - assert(pendingOp.type === "complete", 0x401 /* Unexpected op type */); + assert(pendingOpNode.data.type === "complete", 0x401 /* Unexpected op type */); if (latestPendingOps.length === 0) { this.latestPendingOps.delete(taskId); } @@ -237,7 +362,7 @@ export class TaskManagerClass this.connectionWatcher.on("disconnect", () => { // Emit "lost" for any tasks we were assigned to. for (const [taskId, clientQueue] of this.taskQueues.entries()) { - if (this.isAttached() && clientQueue[0] === this.clientId) { + if (this.isAttached() && clientQueue.first?.data === this.clientId) { this.emit("lost", taskId); } } @@ -258,12 +383,7 @@ export class TaskManagerClass messageId: this.nextPendingMessageId++, }; this.submitLocalMessage(op, pendingOp.messageId); - const latestPendingOps = this.latestPendingOps.get(taskId); - if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, [pendingOp]); - } else { - latestPendingOps.push(pendingOp); - } + this.appendPendingOp(taskId, pendingOp); } private submitAbandonOp(taskId: string): void { @@ -276,12 +396,7 @@ export class TaskManagerClass messageId: this.nextPendingMessageId++, }; this.submitLocalMessage(op, pendingOp.messageId); - const latestPendingOps = this.latestPendingOps.get(taskId); - if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, [pendingOp]); - } else { - latestPendingOps.push(pendingOp); - } + this.appendPendingOp(taskId, pendingOp); } private submitCompleteOp(taskId: string): void { @@ -295,12 +410,16 @@ export class TaskManagerClass }; this.submitLocalMessage(op, pendingOp.messageId); - const latestPendingOps = this.latestPendingOps.get(taskId); + this.appendPendingOp(taskId, pendingOp); + } + + private appendPendingOp(taskId: string, pendingOp: IPendingOp): void { + let latestPendingOps = this.latestPendingOps.get(taskId); if (latestPendingOps === undefined) { - this.latestPendingOps.set(taskId, [pendingOp]); - } else { - latestPendingOps.push(pendingOp); + latestPendingOps = new IndexedList((op) => op.messageId); + this.latestPendingOps.set(taskId, latestPendingOps); } + latestPendingOps.push(pendingOp.messageId, pendingOp); } /** @@ -560,7 +679,7 @@ export class TaskManagerClass return false; } - const currentAssignee = this.taskQueues.get(taskId)?.[0]; + const currentAssignee = this.taskQueues.get(taskId)?.first?.data; return currentAssignee !== undefined && currentAssignee === this.clientId; } @@ -572,7 +691,7 @@ export class TaskManagerClass return false; } - return this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + return this.taskQueues.get(taskId)?.has(this.clientId) ?? false; } /** @@ -634,13 +753,12 @@ export class TaskManagerClass } // Only include tasks if there are clients in the queue. - const filteredMap = new Map(); + const content: [string, string[]][] = []; for (const [taskId, queue] of this.taskQueues) { if (queue.length > 0) { - filteredMap.set(taskId, queue); + content.push([taskId, [...queue.map((node) => node.data)]]); } } - const content = [...filteredMap.entries()]; return createSingleBlobSummary(snapshotFileName, JSON.stringify(content)); } @@ -650,7 +768,11 @@ export class TaskManagerClass protected async loadCore(storage: IChannelStorageService): Promise { const content = await readAndParse<[string, string[]][]>(storage, snapshotFileName); for (const [taskId, clientIdQueue] of content) { - this.taskQueues.set(taskId, clientIdQueue); + const list = new IndexedList((clientId) => clientId); + for (const clientId of clientIdQueue) { + list.push(clientId, clientId); + } + this.taskQueues.set(taskId, list); } this.scrubClientsNotInQuorum(); } @@ -681,15 +803,13 @@ export class TaskManagerClass assertIsTaskManagerOperation(content); const pendingOps = this.latestPendingOps.get(content.taskId); assert(pendingOps !== undefined, 0xc42 /* No pending ops for task on resubmit attempt */); - const pendingOpIndex = pendingOps.findIndex( - (op) => op.messageId === localOpMetadata && op.type === content.type, + const pendingOpNode = pendingOps.getNode(localOpMetadata); + assert( + pendingOpNode?.data.type === content.type, + 0xc43 /* Could not match pending op on resubmit attempt */, ); - assert(pendingOpIndex !== -1, 0xc43 /* Could not match pending op on resubmit attempt */); - pendingOps.splice(pendingOpIndex, 1); - if ( - content.type === "volunteer" && - pendingOps[pendingOps.length - 1]?.type !== "abandon" - ) { + pendingOps.removeNode(pendingOpNode); + if (content.type === "volunteer" && pendingOps.last?.data.type !== "abandon") { this.submitVolunteerOp(content.taskId); } if (pendingOps.length === 0) { @@ -764,20 +884,20 @@ export class TaskManagerClass // Create the queue if it doesn't exist, and push the client on the back. let clientQueue = this.taskQueues.get(taskId); if (clientQueue === undefined) { - clientQueue = []; + clientQueue = new IndexedList((cid) => cid); this.taskQueues.set(taskId, clientQueue); } - if (clientQueue.includes(clientId)) { + if (clientQueue.has(clientId)) { // We shouldn't re-add the client if it's already in the queue. // This may be possible in scenarios where a client was added in // while detached. return; } - const oldLockHolder = clientQueue[0]; - clientQueue.push(clientId); - const newLockHolder = clientQueue[0]; + const oldLockHolder = clientQueue.first?.data; + clientQueue.push(clientId, clientId); + const newLockHolder = clientQueue.first?.data; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); } @@ -791,16 +911,12 @@ export class TaskManagerClass } const oldLockHolder = - clientId === placeholderClientId ? placeholderClientId : clientQueue[0]; - const clientIdIndex = clientQueue.indexOf(clientId); - if (clientIdIndex !== -1) { - clientQueue.splice(clientIdIndex, 1); - // Clean up the queue if there are no more clients in it. - if (clientQueue.length === 0) { - this.taskQueues.delete(taskId); - } + clientId === placeholderClientId ? placeholderClientId : clientQueue.first?.data; + // Clean up the queue if the removal leaves it empty. + if (clientQueue.deleteByKey(clientId) && clientQueue.length === 0) { + this.taskQueues.delete(taskId); } - const newLockHolder = clientQueue[0]; + const newLockHolder = clientQueue.first?.data; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); } @@ -821,15 +937,16 @@ export class TaskManagerClass this.runtime.clientId !== undefined, 0x475 /* this.runtime.clientId should be defined */, ); + const realClientId = this.runtime.clientId; for (const clientQueue of this.taskQueues.values()) { - const clientIdIndex = clientQueue.indexOf(placeholderClientId); - if (clientIdIndex !== -1) { - if (clientQueue.includes(this.runtime.clientId)) { - // If the real clientId is already in the queue, just remove the placeholder. - clientQueue.splice(clientIdIndex, 1); - } else { - clientQueue[clientIdIndex] = this.runtime.clientId; + const placeholderNode = clientQueue.getNode(placeholderClientId); + if (placeholderNode !== undefined) { + if (!clientQueue.has(realClientId)) { + // Insert the real clientId at the placeholder's position before we remove the placeholder. + clientQueue.insertAfter(placeholderNode, realClientId, realClientId); } + // Remove the placeholder; if the real clientId was already present we just drop the placeholder. + clientQueue.deleteByKey(placeholderClientId); } } } @@ -839,17 +956,23 @@ export class TaskManagerClass private scrubClientsNotInQuorum(): void { const quorum = this.runtime.getQuorum(); for (const [taskId, clientQueue] of this.taskQueues) { - const filteredClientQueue = clientQueue.filter( - (clientId) => quorum.getMember(clientId) !== undefined, - ); - if (clientQueue.length !== filteredClientQueue.length) { - if (filteredClientQueue.length === 0) { - this.taskQueues.delete(taskId); - } else { - this.taskQueues.set(taskId, filteredClientQueue); + // Walk by collecting removable nodes first to avoid mutating during iteration. + const toRemove: ListNode[] = []; + for (const node of clientQueue) { + if (quorum.getMember(node.data) === undefined) { + toRemove.push(node); } - this.queueWatcher.emit("queueChange", taskId); } + if (toRemove.length === 0) { + continue; + } + for (const node of toRemove) { + clientQueue.removeNode(node); + } + if (clientQueue.length === 0) { + this.taskQueues.delete(taskId); + } + this.queueWatcher.emit("queueChange", taskId); } } @@ -858,13 +981,10 @@ export class TaskManagerClass * for the latest pending ops. */ private queuedOptimistically(taskId: string): boolean { - const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + const inQueue = this.taskQueues.get(taskId)?.has(this.clientId) ?? false; const latestPendingOps = this.latestPendingOps.get(taskId); - const latestPendingOp = - latestPendingOps !== undefined && latestPendingOps.length > 0 - ? latestPendingOps[latestPendingOps.length - 1] - : undefined; + const latestPendingOp = latestPendingOps?.last?.data; const isPendingVolunteer = latestPendingOp?.type === "volunteer"; const isPendingAbandonOrComplete = latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete"; @@ -904,7 +1024,8 @@ export class TaskManagerClass const pendingOpToRollback = latestPendingOps.pop(); assert( // eslint-disable-next-line @typescript-eslint/prefer-optional-chain -- using ?. could change behavior - pendingOpToRollback !== undefined && pendingOpToRollback.messageId === localOpMetadata, + pendingOpToRollback !== undefined && + pendingOpToRollback.data.messageId === localOpMetadata, 0xc47 /* pending op mismatch */, ); if (latestPendingOps.length === 0) { diff --git a/packages/dds/task-manager/src/test/fuzzUtils.ts b/packages/dds/task-manager/src/test/fuzzUtils.ts index b43a9e87dbfd..f9aa73d7c59c 100644 --- a/packages/dds/task-manager/src/test/fuzzUtils.ts +++ b/packages/dds/task-manager/src/test/fuzzUtils.ts @@ -163,6 +163,40 @@ interface LoggingInfo { taskId: string; } +/** + * Internal shape exposed only for test introspection. + * + * Mirrors the {@link IndexedList} wrapper added to `TaskManagerClass` so these helpers + * can traverse the queue without relying on the structural details of the wrapper. + */ +interface TaskQueueLike { + readonly length: number; + readonly first: { readonly data: string } | undefined; + [Symbol.iterator](): IterableIterator<{ readonly data: string }>; +} + +/** + * Reads the private `taskQueues` map off a {@link ITaskManager} for test introspection, + * and returns the queue for `taskId` as an array of clientIds (in queue order). + * + * Centralizes the `as any` cast and shields the call sites from changes to the underlying + * data structure. + */ +function readTaskQueue(taskManager: ITaskManager, taskId: string): string[] | undefined { + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any */ + const queues: Map = (taskManager as any).taskQueues; + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any */ + const queue = queues.get(taskId); + if (queue === undefined) { + return undefined; + } + const clientIds: string[] = []; + for (const node of queue) { + clientIds.push(node.data); + } + return clientIds; +} + function logCurrentState(state: FuzzTestState, loggingInfo: LoggingInfo): void { for (const client of state.clients) { const taskManager = client.channel; @@ -171,8 +205,7 @@ function logCurrentState(state: FuzzTestState, loggingInfo: LoggingInfo): void { console.log( `TaskManager ${taskManager.id} (CanVolunteer: ${taskManager.canVolunteer()}):`, ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any - console.log((taskManager as any).taskQueues.get(loggingInfo.taskId)); + console.log(readTaskQueue(taskManager, loggingInfo.taskId)); console.log("\n"); } } @@ -235,24 +268,26 @@ function makeReducer(loggingInfo?: LoggingInfo): Reducer = (a as any).taskQueues; - const queue2: Map = (b as any).taskQueues; + const queues1: Map = (a as any).taskQueues; + const queues2: Map = (b as any).taskQueues; /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any */ - assert.strictEqual(queue1.size, queue2.size, "The number of tasks queues are not the same"); - for (const [key, val] of queue1) { - const testVal = queue2.get(key); - if (testVal === undefined) { - assert(val === undefined, "Task queues are not both undefined"); + assert.strictEqual( + queues1.size, + queues2.size, + "The number of tasks queues are not the same", + ); + for (const key of queues1.keys()) { + const aQueue = readTaskQueue(a, key); + const bQueue = readTaskQueue(b, key); + if (bQueue === undefined) { + assert(aQueue === undefined, "Task queues are not both undefined"); continue; } - assert.strictEqual(testVal.length, val.length, "Task queues are not the same size"); - if (testVal.length > 0) { - const testValArr = testVal; - const valArr = val; - for (const [index, task] of testValArr.entries()) { - assert.strictEqual(task, valArr[index], `Task queues are not identical`); - } + assert(aQueue !== undefined, "Task queues are not both defined"); + assert.strictEqual(bQueue.length, aQueue.length, "Task queues are not the same size"); + for (const [index, task] of bQueue.entries()) { + assert.strictEqual(task, aQueue[index], `Task queues are not identical`); } } } diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index 1564a28e58eb..2361a554a2f8 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -12,6 +12,7 @@ import { MockContainerRuntimeFactory, MockContainerRuntimeFactoryForReconnection, MockFluidDataStoreRuntime, + MockSharedObjectServices, MockStorage, } from "@fluidframework/test-runtime-utils/internal"; @@ -19,6 +20,53 @@ import type { ITaskManager } from "../interfaces.js"; import { TaskManagerClass } from "../taskManager.js"; import { TaskManagerFactory } from "../taskManagerFactory.js"; +/** + * Internal shape exposed only for test introspection. + * + * Mirrors the `IndexedList` wrapper that `TaskManagerClass` uses for `taskQueues` so these + * helpers can traverse the queue without depending on the structural details of the wrapper. + */ +interface TaskQueueLike { + readonly length: number; + readonly first: { readonly data: string } | undefined; + [Symbol.iterator](): IterableIterator<{ readonly data: string }>; +} + +/** + * Reads the private `taskQueues` map off a {@link TaskManagerClass} for test introspection. + */ +function getInternalTaskQueues(taskManager: TaskManagerClass): Map { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-return + return (taskManager as any).taskQueues; +} + +/** + * Returns the queue for `taskId` as an array of clientIds in queue order, or `undefined` if + * no queue exists for that task. + */ +function getTaskQueueAsArray( + taskManager: TaskManagerClass, + taskId: string, +): string[] | undefined { + const queue = getInternalTaskQueues(taskManager).get(taskId); + if (queue === undefined) { + return undefined; + } + const clientIds: string[] = []; + for (const node of queue) { + clientIds.push(node.data); + } + return clientIds; +} + +/** + * Returns the clientId at the head of the queue (the current lock holder candidate) for + * `taskId`, or `undefined` if no queue exists. + */ +function getTaskQueueHead(taskManager: TaskManagerClass, taskId: string): string | undefined { + return getInternalTaskQueues(taskManager).get(taskId)?.first?.data; +} + function createConnectedTaskManager( id: string, runtimeFactory: MockContainerRuntimeFactory, @@ -756,8 +804,7 @@ describe("TaskManager", () => { assert.ok(taskManager1.queued(taskId), "Should be queued"); assert.ok(taskManager1.assigned(taskId), "Should be assigned"); assert.strictEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId)?.[0], + getTaskQueueHead(taskManager1, taskId), placeholderClientId, "taskQueue should have placeholder clientId", ); @@ -776,8 +823,10 @@ describe("TaskManager", () => { assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); assert.ok(taskManager1EventFired, "Should have raised lost event on taskManager1"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any - assert.ok((taskManager1 as any).taskQueues.size === 0, "taskQueue should be empty"); + assert.ok( + getInternalTaskQueues(taskManager1).size === 0, + "taskQueue should be empty", + ); }); }); @@ -819,13 +868,11 @@ describe("TaskManager", () => { assert.ok(taskManager1.subscribed(taskId), "Task manager 1 should be subscribed"); assert.ok( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId)?.length !== 0, + (getInternalTaskQueues(taskManager1).get(taskId)?.length ?? 0) !== 0, "taskQueue should not be empty", ); assert.notStrictEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId)?.[0], + getTaskQueueHead(taskManager1, taskId), placeholderClientId, "taskQueue should not have placeholder clientId", ); @@ -982,8 +1029,7 @@ describe("TaskManager", () => { const clientId2 = containerRuntime2.clientId; assert.deepEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId), + getTaskQueueAsArray(taskManager1, taskId), [clientId1, clientId2], "Task queue should have both clients", ); @@ -992,8 +1038,7 @@ describe("TaskManager", () => { containerRuntimeFactory.processAllMessages(); assert.deepEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call - (taskManager1 as any).taskQueues.get(taskId), + getTaskQueueAsArray(taskManager1, taskId), [clientId1], "Task queue should only have client 1", ); @@ -1233,4 +1278,285 @@ describe("TaskManager", () => { describe("Completing tasks", () => {}); }); }); + + describe("Queue ordering regressions", () => { + // These tests exercise invariants around queue ordering that were previously + // guaranteed by the side-by-side `taskQueues` + `taskQueueIndex` structures and + // are now guaranteed by the `IndexedList` wrapper. + + describe("scrubClientsNotInQuorum", () => { + it("removes only the non-quorum clients and preserves the relative order of survivors", async () => { + // Wire up a queue with multiple clients via real volunteer ops so we don't + // have to manufacture an `IndexedList` from outside the module. + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager1 = createConnectedTaskManager("tm1", containerRuntimeFactory); + const taskManager2 = createConnectedTaskManager("tm2", containerRuntimeFactory); + const taskManager3 = createConnectedTaskManager("tm3", containerRuntimeFactory); + const taskManager4 = createConnectedTaskManager("tm4", containerRuntimeFactory); + + const taskId = "taskId"; + const p1 = taskManager1.volunteerForTask(taskId); + // Subscribe the others (rather than awaiting their volunteer promises) so + // they join the queue without leaving outstanding promises that never + // settle (only the head of the queue resolves). + taskManager2.subscribeToTask(taskId); + taskManager3.subscribeToTask(taskId); + taskManager4.subscribeToTask(taskId); + containerRuntimeFactory.processAllMessages(); + await p1; + + // Capture the established queue order so the assertion is independent of + // the (random) clientId values. + const initialOrder = getTaskQueueAsArray(taskManager1, taskId); + assert.ok(initialOrder?.length === 4); + const c1 = initialOrder[0] as string; + const c2 = initialOrder[1] as string; + const c3 = initialOrder[2] as string; + const c4 = initialOrder[3] as string; + + // Remove clients 2 and 4 from the quorum's underlying members map + // *without* emitting the `removeMember` event — that event is what would + // normally trigger removeClientFromAllQueues. By suppressing it, we leave + // stale entries in the queue and force scrubClientsNotInQuorum to do the + // work we want to test. + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const quorumMembers: Map = (taskManager1 as any).runtime.getQuorum() + .members; + quorumMembers.delete(c2); + quorumMembers.delete(c4); + (taskManager1 as any).scrubClientsNotInQuorum(); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + + assert.deepEqual( + getTaskQueueAsArray(taskManager1, taskId), + [c1, c3], + "Survivors must remain in their original relative order", + ); + }); + }); + + describe("Placeholder → real clientId substitution", () => { + it("keeps the head of the queue stable when the placeholder is the lock holder", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const { taskManager: taskManagerDetached } = createDetachedTaskManager( + "detached", + containerRuntimeFactory, + ); + + // Force runtime.clientId to undefined so the detached volunteer path inserts + // the placeholder clientId rather than the auto-generated one. We then + // manually splice other clients in behind the placeholder via the private + // `addClientToQueue` so we can verify the placeholder→real swap preserves + // the placeholder's position rather than reinserting at the tail. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (taskManagerDetached as any).runtime.clientId = undefined; + + const taskId = "taskId"; + const volunteerP = taskManagerDetached.volunteerForTask(taskId); + containerRuntimeFactory.processAllMessages(); + await volunteerP; + + // Inject fake quorum members so addClientToQueue accepts them. + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const quorum: { addMember: (id: string, c: unknown) => void } = ( + taskManagerDetached as any + ).runtime.getQuorum(); + quorum.addMember("other-1", {}); + quorum.addMember("other-2", {}); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManagerDetached as any).addClientToQueue(taskId, "other-1"); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManagerDetached as any).addClientToQueue(taskId, "other-2"); + + assert.deepEqual( + getTaskQueueAsArray(taskManagerDetached, taskId), + ["placeholder", "other-1", "other-2"], + "Pre-condition: placeholder is at the head", + ); + + // Trigger the substitution path. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (taskManagerDetached as any).runtime.clientId = "real-client"; + quorum.addMember("real-client", {}); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManagerDetached as any).replacePlaceholderInAllQueues(); + + assert.deepEqual( + getTaskQueueAsArray(taskManagerDetached, taskId), + ["real-client", "other-1", "other-2"], + "Real clientId must take the placeholder's slot, not append at the tail", + ); + }); + }); + + describe("reSubmitCore", () => { + it("removes the matching pending op without disturbing siblings", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager = createConnectedTaskManager("tm", containerRuntimeFactory); + const taskId = "taskId"; + + // Submit three volunteer ops back-to-back without processing — this drives + // `latestPendingOps[taskId]` to hold three entries, each with a distinct + // messageId. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + + type PendingOpList = Iterable<{ data: { type: string; messageId: number } }>; + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const pendingOps: PendingOpList = (taskManager as any).latestPendingOps.get(taskId); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + assert.ok(pendingOps !== undefined); + const messageIds: number[] = []; + for (const node of pendingOps) { + messageIds.push(node.data.messageId); + } + assert.strictEqual(messageIds.length, 3); + const firstId = messageIds[0] as number; + const middleId = messageIds[1] as number; + const lastId = messageIds[2] as number; + + // Resubmit only the middle op — the matching op should be removed and the + // surviving siblings should remain in their original relative order. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).reSubmitCore({ type: "volunteer", taskId }, middleId); + + const surviving: number[] = []; + for (const node of pendingOps) { + surviving.push(node.data.messageId); + } + // reSubmitCore re-submits a fresh volunteer op when the last pending is not + // an abandon, which appends a new entry at the tail with a messageId greater + // than `lastId`. The leading entries should be the original first/last in + // their original relative order. + assert.strictEqual(surviving.length, 3); + assert.strictEqual(surviving[0], firstId, "First sibling must keep its slot"); + assert.strictEqual( + surviving[1], + lastId, + "Last sibling must keep its slot (relative to surviving siblings)", + ); + assert.ok( + (surviving[2] as number) > lastId, + "Resubmitted volunteer op should be appended at the tail", + ); + }); + }); + + describe("rollback", () => { + it("rolls back the latest pending op (LIFO)", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager = createConnectedTaskManager("tm", containerRuntimeFactory); + const taskId = "taskId"; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitAbandonOp(taskId); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).submitVolunteerOp(taskId); + + type PendingOpList = Iterable<{ data: { type: string; messageId: number } }>; + /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + const pendingOps: PendingOpList = (taskManager as any).latestPendingOps.get(taskId); + /* eslint-enable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ + assert.ok(pendingOps !== undefined); + const messageIds: number[] = []; + const types: string[] = []; + for (const node of pendingOps) { + messageIds.push(node.data.messageId); + types.push(node.data.type); + } + assert.deepEqual(types, ["volunteer", "abandon", "volunteer"]); + const lastId = messageIds[2]; + + // Rolling back the last submitted op should pop only the tail entry and + // leave the head/middle untouched. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).rollback({ type: "volunteer", taskId }, lastId); + + const remainingTypes: string[] = []; + for (const node of pendingOps) { + remainingTypes.push(node.data.type); + } + assert.deepEqual( + remainingTypes, + ["volunteer", "abandon"], + "Rollback must pop only the tail (LIFO)", + ); + + // Rolling back the abandon (now the tail) should similarly only remove it. + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (taskManager as any).rollback({ type: "abandon", taskId }, messageIds[1]); + + const finalTypes: string[] = []; + for (const node of pendingOps) { + finalTypes.push(node.data.type); + } + assert.deepEqual(finalTypes, ["volunteer"]); + }); + }); + + describe("summarize → loadCore round-trip", () => { + it("preserves queue order across summary/load", async () => { + const containerRuntimeFactory = new MockContainerRuntimeFactory(); + const taskManager1 = createConnectedTaskManager("tm1", containerRuntimeFactory); + const taskManager2 = createConnectedTaskManager("tm2", containerRuntimeFactory); + const taskManager3 = createConnectedTaskManager("tm3", containerRuntimeFactory); + const taskId1 = "taskA"; + const taskId2 = "taskB"; + + // Use subscribe for the followers so we don't leave outstanding volunteer + // promises that never settle (only the head of each queue resolves). + const p1a = taskManager1.volunteerForTask(taskId1); + taskManager2.subscribeToTask(taskId1); + taskManager3.subscribeToTask(taskId1); + const p3b = taskManager3.volunteerForTask(taskId2); + taskManager1.subscribeToTask(taskId2); + containerRuntimeFactory.processAllMessages(); + await Promise.all([p1a, p3b]); + + const expectedA = getTaskQueueAsArray(taskManager1, taskId1); + const expectedB = getTaskQueueAsArray(taskManager1, taskId2); + assert.ok(expectedA?.length === 3); + assert.ok(expectedB?.length === 2); + + // Round-trip the summary into a fresh TaskManager and verify the queues + // come back in the same order. + const summaryResult = await taskManager1.summarize(); + const services = MockSharedObjectServices.createFromSummary(summaryResult.summary); + + // We need all the original clientIds present in the new runtime's quorum + // so scrubClientsNotInQuorum (called from loadCore) doesn't drop them. + const reloadedRuntime = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(reloadedRuntime); + for (const clientId of [...expectedA, ...expectedB]) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call + (reloadedRuntime as any).quorum.addMember(clientId, {}); + } + + const reloaded = new TaskManagerClass( + "tm-reloaded", + reloadedRuntime, + TaskManagerFactory.Attributes, + ); + await reloaded.load(services); + + assert.deepEqual( + getTaskQueueAsArray(reloaded, taskId1), + expectedA, + "taskA queue ordering must survive summary/load", + ); + assert.deepEqual( + getTaskQueueAsArray(reloaded, taskId2), + expectedB, + "taskB queue ordering must survive summary/load", + ); + }); + }); + }); });