From 8a6667f93f6a959afe7cd846d4bc8b6a7fc1ad09 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 16:14:25 -0700 Subject: [PATCH 1/2] feat(container-runtime): bunch reSubmit dispatch across same-DDS runs Extend the bunched dispatch pattern from inbound processMessages to the outbound reSubmit path. ContainerRuntime.reSubmitBatch now collects runs of FluidDataStoreOp entries and dispatches them via a new ChannelCollection.reSubmitContainerMessages, which bunches by (address, ddsType) exactly like processChannelMessages. Adds @legacy @beta IRuntimeResubmitMessageCollection and optional reSubmitMessages methods on IFluidDataStoreChannel and IDeltaHandler. ChannelDeltaConnection fans out stashed-op metadata across bunch entries before dispatch. SharedObject provides a default reSubmitMessages that loops over existing reSubmitCore / reSubmitSquashedCore, so DDSes work unchanged; SharedTree / MergeTree can override in follow-up PRs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/bunched-resubmit-dispatch.md | 18 +++ .../shared-object-base/src/sharedObject.ts | 10 ++ .../src/channelCollection.ts | 58 ++++++++ .../container-runtime/src/containerRuntime.ts | 33 ++++- .../container-runtime/src/dataStoreContext.ts | 14 ++ .../src/test/containerRuntime.spec.ts | 129 ++++++++++++++++-- .../datastore-definitions.legacy.alpha.api.md | 1 + .../datastore-definitions.legacy.beta.api.md | 1 + .../datastore-definitions/src/channel.ts | 14 ++ .../api-report/datastore.legacy.beta.api.md | 1 + packages/runtime/datastore/package.json | 6 +- .../runtime/datastore/src/channelContext.ts | 7 + .../datastore/src/channelDeltaConnection.ts | 35 +++++ .../runtime/datastore/src/dataStoreRuntime.ts | 70 ++++++++++ .../datastore/src/localChannelContext.ts | 10 ++ .../datastore/src/remoteChannelContext.ts | 6 + .../validateDatastorePrevious.generated.ts | 1 + .../runtime-definitions.legacy.alpha.api.md | 13 ++ .../runtime-definitions.legacy.beta.api.md | 13 ++ .../src/dataStoreContext.ts | 16 +++ .../runtime/runtime-definitions/src/index.ts | 2 + .../runtime-definitions/src/protocol.ts | 38 ++++++ 22 files changed, 484 insertions(+), 12 deletions(-) create mode 100644 .changeset/bunched-resubmit-dispatch.md diff --git a/.changeset/bunched-resubmit-dispatch.md b/.changeset/bunched-resubmit-dispatch.md new file mode 100644 index 000000000000..3b4c309add9d --- /dev/null +++ b/.changeset/bunched-resubmit-dispatch.md @@ -0,0 +1,18 @@ +--- +"@fluidframework/runtime-definitions": minor +"@fluidframework/datastore-definitions": minor +"@fluidframework/container-runtime": minor +"@fluidframework/datastore": minor +"__section": feature +--- +Extend bunched dispatch from `processMessages` to `reSubmit` + +The container runtime now bunches contiguous same-DDS resubmit entries when replaying a pending batch, mirroring the existing bunched dispatch for inbound `processMessages`. A batch of N consecutive ops targeting the same channel now makes one round trip through `ChannelCollection → FluidDataStoreContext → FluidDataStoreRuntime → IChannelContext → IDeltaHandler` rather than N. + +New API surface on `@legacy @beta`: + +- `IRuntimeResubmitMessage` and `IRuntimeResubmitMessageCollection` (`@fluidframework/runtime-definitions`) — the bunched envelope, with a shared `squash` flag. +- Optional `IFluidDataStoreChannel.reSubmitMessages(type, collection)` (`@fluidframework/runtime-definitions`) — opt-in bunched form alongside the existing `reSubmit`. +- Optional `IDeltaHandler.reSubmitMessages(collection)` (`@fluidframework/datastore-definitions`) — opt-in bunched form alongside the existing `reSubmit`. + +DDSes that do not implement `reSubmitMessages` automatically fall back to per-message `reSubmit` calls. `SharedObject`-derived DDSes get a default implementation that loops on the existing `reSubmitCore` / `reSubmitSquashedCore` paths; they may override to take advantage of seeing the full run together. Non-`FluidDataStoreOp` runtime message types (Attach, Alias, GC, etc.) continue to use the existing single-op `reSubmit` path. diff --git a/packages/dds/shared-object-base/src/sharedObject.ts b/packages/dds/shared-object-base/src/sharedObject.ts index 5fa7295ae60e..0bc2e5480217 100644 --- a/packages/dds/shared-object-base/src/sharedObject.ts +++ b/packages/dds/shared-object-base/src/sharedObject.ts @@ -35,6 +35,7 @@ import { totalBlobSizePropertyName, type IRuntimeMessageCollection, type IRuntimeMessagesContent, + type IRuntimeResubmitMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { toDeltaManagerInternal, @@ -550,6 +551,15 @@ export abstract class SharedObjectCore< reSubmit: (content: unknown, localOpMetadata: unknown, squash: boolean) => { this.reSubmit(content, localOpMetadata, squash); }, + reSubmitMessages: (collection: IRuntimeResubmitMessageCollection) => { + // Default implementation iterates the bunch and dispatches each entry through the + // existing single-op reSubmit dispatcher (reSubmitCore vs reSubmitSquashed). DDSes + // that can take advantage of seeing the contiguous run together may override the + // IDeltaHandler attached here. + for (const { contents, localOpMetadata } of collection.messages) { + this.reSubmit(contents, localOpMetadata, collection.squash); + } + }, applyStashedOp: (content: unknown): void => { this.applyStashedOp(parseHandles(content, this.serializer)); }, diff --git a/packages/runtime/container-runtime/src/channelCollection.ts b/packages/runtime/container-runtime/src/channelCollection.ts index a0154267469f..9649090ab054 100644 --- a/packages/runtime/container-runtime/src/channelCollection.ts +++ b/packages/runtime/container-runtime/src/channelCollection.ts @@ -44,6 +44,7 @@ import type { InboundAttachMessage, IRuntimeMessageCollection, IRuntimeMessagesContent, + IRuntimeResubmitMessage, ISummarizeResult, ISummaryTreeWithStats, ITelemetryContext, @@ -830,6 +831,63 @@ export class ChannelCollection context.reSubmit(envelope.contents, localOpMetadata, squash); }; + /** + * Resubmit a contiguous run of FluidDataStoreOp entries. Entries are bunched by + * `(address, FluidDataStoreMessage.type)` and forwarded to each data store context in a single + * {@link FluidDataStoreContext.reSubmitMessages} call per bunch — mirroring the inbound + * {@link ChannelCollection.processChannelMessages} bunching pattern. + */ + public readonly reSubmitContainerMessages = ( + entries: readonly { + envelope: IEnvelope; + localOpMetadata: unknown; + }[], + squash: boolean, + ): void => { + let currentAddress: string | undefined; + let currentType: string | undefined; + let currentMessages: IRuntimeResubmitMessage[] = []; + + const flushCurrent = (): void => { + if ( + currentAddress === undefined || + currentType === undefined || + currentMessages.length === 0 + ) { + return; + } + const context = this.contexts.get(currentAddress); + if ( + this.checkAndLogIfDeleted( + currentAddress, + context, + "Changed", + "reSubmitContainerMessages", + ) + ) { + throw new DataCorruptionError("Context is deleted!", { + callSite: "reSubmitContainerMessages", + ...tagCodeArtifacts({ id: currentAddress }), + }); + } + assert(!!context, "There should be a store context for the op"); + context.reSubmitMessages(currentType, { squash, messages: currentMessages }); + currentMessages = []; + }; + + for (const { envelope, localOpMetadata } of entries) { + const { address } = envelope; + const { type: ddsType, content } = envelope.contents; + if (currentAddress !== address || currentType !== ddsType) { + flushCurrent(); + currentAddress = address; + currentType = ddsType; + } + currentMessages.push({ contents: content, localOpMetadata }); + } + flushCurrent(); + }; + public readonly rollbackDataStoreOp = ( envelope: IEnvelope, localOpMetadata: unknown, diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 3c13325457e4..584ff62a0905 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -118,9 +118,11 @@ import type { IGarbageCollectionData, CreateChildSummarizerNodeParam, IDataStore, + IEnvelope, IFluidDataStoreContextDetached, IFluidDataStoreRegistry, IFluidParentContext, + FluidDataStoreMessage, ISummarizeInternalResult, InboundAttachMessage, NamedFluidDataStoreRegistryEntries, @@ -5030,9 +5032,36 @@ export class ContainerRuntime : this.reSubmit.bind(this); this.batchRunner.run(() => { - for (const message of batch) { - resubmitFn(message); + // Collect contiguous runs of FluidDataStoreOp entries and dispatch each run through + // the bunched path so the channel collection can group them by (address, ddsType) for + // efficient single-call resubmit at the DDS layer. Non-FluidDataStoreOp entries flush + // the current run and use the existing single-op path (squash-aware via resubmitFn). + let currentBunch: { + envelope: IEnvelope; + localOpMetadata: unknown; + }[] = []; + + const flushBunch = (): void => { + if (currentBunch.length === 0) { + return; + } + this.channelCollection.reSubmitContainerMessages(currentBunch, squash); + currentBunch = []; + }; + + for (const data of batch) { + const message = data.runtimeOp; + if (message.type === ContainerMessageType.FluidDataStoreOp) { + currentBunch.push({ + envelope: message.contents, + localOpMetadata: data.localOpMetadata, + }); + } else { + flushBunch(); + resubmitFn(data); + } } + flushBunch(); }, resubmitInfo); this.flush(resubmitInfo); diff --git a/packages/runtime/container-runtime/src/dataStoreContext.ts b/packages/runtime/container-runtime/src/dataStoreContext.ts index 6b46a9ac9537..af34b5b7548d 100644 --- a/packages/runtime/container-runtime/src/dataStoreContext.ts +++ b/packages/runtime/container-runtime/src/dataStoreContext.ts @@ -64,6 +64,7 @@ import type { IInboundSignalMessage, IPendingMessagesState, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IFluidDataStoreFactory, PackagePath, IRuntimeStorageService, @@ -1101,6 +1102,19 @@ export abstract class FluidDataStoreContext this.channel.reSubmit(message.type, message.content, localOpMetadata, squash); } + public reSubmitMessages(type: string, collection: IRuntimeResubmitMessageCollection): void { + assert(!!this.channel, "Channel must exist when resubmitting ops"); + if (this.channel.reSubmitMessages !== undefined) { + this.channel.reSubmitMessages(type, collection); + return; + } + + // Fallback for channels that haven't opted in to the bunched form. + for (const { contents, localOpMetadata } of collection.messages) { + this.channel.reSubmit(type, contents, localOpMetadata, collection.squash); + } + } + public rollback(message: FluidDataStoreMessage, localOpMetadata: unknown): void { if (!this.channel) { throw new Error("Channel must exist when rolling back ops"); diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 39f5e2af7aad..812a835b226d 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -190,6 +190,21 @@ function stubChannelCollection( .stub, void>() .callsFake(containerRuntime.submitMessage.bind(containerRuntime)); + // Bunched form: replay each entry as a singleton FluidDataStoreOp through submitMessage. + const reSubmitBunchedFake = sandbox + .stub, void>() + .callsFake((entries, squash) => { + for (const { envelope, localOpMetadata } of entries) { + containerRuntime.submitMessage( + { + type: ContainerMessageType.FluidDataStoreOp, + contents: envelope, + }, + localOpMetadata, + ); + } + }); + const stub = Sinon.createStubInstance( // createSubInstance does not work with property methods (which are // used for stricter typing); so, override via a subclass here. @@ -199,6 +214,10 @@ function stubChannelCollection( ..._args: Parameters ): void {} // @ts-expect-error -- redefine as instance method for stubbing + public reSubmitContainerMessages( + ..._args: Parameters + ): void {} + // @ts-expect-error -- redefine as instance method for stubbing public rollbackDataStoreOp( ..._args: Parameters ) {} @@ -206,6 +225,7 @@ function stubChannelCollection( { setConnectionState: sandbox.stub(), reSubmitContainerMessage: reSubmitFake, + reSubmitContainerMessages: reSubmitBunchedFake, rollbackDataStoreOp: sandbox.stub(), notifyStagingMode: sandbox.stub(), dispose: sandbox.stub(), @@ -1540,6 +1560,18 @@ describe("Runtime", () => { setConnectionState: (_connected: boolean, _clientId?: string) => {}, // Pass data store op right back to ContainerRuntime reSubmitContainerMessage: containerRuntime.submitMessage.bind(containerRuntime), + // Bunched form: replay each entry as a singleton through submitMessage. + reSubmitContainerMessages: (entries, _squash) => { + for (const { envelope, localOpMetadata } of entries) { + containerRuntime.submitMessage( + { + type: ContainerMessageType.FluidDataStoreOp, + contents: envelope, + }, + localOpMetadata, + ); + } + }, } satisfies Partial; return patched; @@ -4487,19 +4519,20 @@ describe("Runtime", () => { controls.commitChanges(); assert( - channelCollectionStub.reSubmitContainerMessage.calledOnce, - "Expected reSubmit to be called once. Prestaging op should not be resubmitted", + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected reSubmitContainerMessages to be called once. Prestaging op should not be resubmitted", ); assert( - channelCollectionStub.reSubmitContainerMessage.calledWithExactly( - { - type: ContainerMessageType.FluidDataStoreOp, - contents: { address: "2", contents: stagedOpContents }, - }, - "LOCAL_OP_METADATA", + channelCollectionStub.reSubmitContainerMessages.calledWithExactly( + [ + { + envelope: { address: "2", contents: stagedOpContents }, + localOpMetadata: "LOCAL_OP_METADATA", + }, + ], /* squash: */ false, // False by default on commitChanges ), - "Unexpected args for reSubmit", + "Unexpected args for reSubmitContainerMessages", ); assert( channelCollectionStub.notifyStagingMode.getCall(1)?.calledWithExactly(false), @@ -4515,6 +4548,84 @@ describe("Runtime", () => { assert.equal(submittedOps[1].contents.address, "2", "Unexpected staged op address"); }); + it("commitChanges bunches contiguous same-data-store ops into one reSubmitContainerMessages call", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + // Three contiguous ops targeting data store "ds1" with the same DDS op type. + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("a"), "md-a"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("b"), "md-b"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("c"), "md-c"); + containerRuntime.flush(); + + controls.commitChanges(); + + assert( + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected exactly one bunched reSubmitContainerMessages call for the contiguous run", + ); + const [entries, squash] = + channelCollectionStub.reSubmitContainerMessages.firstCall.args; + assert.strictEqual(entries.length, 3, "Expected all three entries in one bunch"); + assert.strictEqual(squash, false, "commitChanges default should not squash"); + assert.deepStrictEqual( + entries.map((e) => e.envelope.address), + ["ds1", "ds1", "ds1"], + "All entries should target the same data store address", + ); + assert.deepStrictEqual( + entries.map((e) => e.localOpMetadata), + ["md-a", "md-b", "md-c"], + "Each entry's localOpMetadata should be preserved", + ); + }); + + it("commitChanges splits bunches when the data store address changes", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + // Two ops to ds1, then one to ds2, then two more to ds1 — should produce 3 bunches. + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("a"), "md-a"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("b"), "md-b"); + submitDataStoreOp(containerRuntime, "ds2", genTestDataStoreMessage("c"), "md-c"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("d"), "md-d"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("e"), "md-e"); + containerRuntime.flush(); + + controls.commitChanges(); + + // All five entries arrive in a single contiguous reSubmitContainerMessages call; + // the per-address grouping happens inside ChannelCollection. + assert( + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected single reSubmitContainerMessages call carrying the contiguous run", + ); + const [entries] = channelCollectionStub.reSubmitContainerMessages.firstCall.args; + assert.deepStrictEqual( + entries.map((e) => e.envelope.address), + ["ds1", "ds1", "ds2", "ds1", "ds1"], + "Entries should preserve submission order across address changes", + ); + }); + + it("commitChanges with squash propagates the flag on the bunched call", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("a"), "md-a"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("b"), "md-b"); + containerRuntime.flush(); + + controls.commitChanges({ squash: true }); + + assert( + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected one bunched reSubmitContainerMessages call", + ); + const [, squash] = channelCollectionStub.reSubmitContainerMessages.firstCall.args; + assert.strictEqual(squash, true, "squash flag should propagate to the bunched call"); + }); + it("discardChanges drops staged ops", () => { const channelCollectionStub = stubChannelCollection(containerRuntime); diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md index fbf624137081..f1a23571a73f 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md @@ -61,6 +61,7 @@ export interface IDeltaHandler { applyStashedOp(message: any): void; processMessages: (messageCollection: IRuntimeMessageCollection) => void; reSubmit(message: any, localOpMetadata: unknown, squash?: boolean): void; + reSubmitMessages?(collection: IRuntimeResubmitMessageCollection): void; rollback?(message: any, localOpMetadata: unknown): void; setConnectionState(connected: boolean): void; } diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md index 174ec80ca5a6..cfda94593c5c 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md @@ -61,6 +61,7 @@ export interface IDeltaHandler { applyStashedOp(message: any): void; processMessages: (messageCollection: IRuntimeMessageCollection) => void; reSubmit(message: any, localOpMetadata: unknown, squash?: boolean): void; + reSubmitMessages?(collection: IRuntimeResubmitMessageCollection): void; rollback?(message: any, localOpMetadata: unknown): void; setConnectionState(connected: boolean): void; } diff --git a/packages/runtime/datastore-definitions/src/channel.ts b/packages/runtime/datastore-definitions/src/channel.ts index 527af53e1138..e9de71698221 100644 --- a/packages/runtime/datastore-definitions/src/channel.ts +++ b/packages/runtime/datastore-definitions/src/channel.ts @@ -9,6 +9,7 @@ import type { IExperimentalIncrementalSummaryContext, IGarbageCollectionData, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, ISummaryTreeWithStats, ITelemetryContext, } from "@fluidframework/runtime-definitions/internal"; @@ -167,6 +168,19 @@ export interface IDeltaHandler { // eslint-disable-next-line @typescript-eslint/no-explicit-any reSubmit(message: any, localOpMetadata: unknown, squash?: boolean): void; + /** + * Called when the runtime asks the client to resubmit a bunch of contiguous messages. + * The bunched form of {@link IDeltaHandler.reSubmit}, mirroring the inbound + * {@link IDeltaHandler.processMessages} shape so DDSes that benefit from processing a + * contiguous run together can do so on resubmit. + * + * Optional: if not implemented, the runtime falls back to iterating the collection and + * invoking {@link IDeltaHandler.reSubmit} per message. + * + * @param collection - The bunch of messages to resubmit, with a shared `squash` flag. + */ + reSubmitMessages?(collection: IRuntimeResubmitMessageCollection): void; + /** * Apply changes from an op just as if a local client has made the change, * including submitting the op. Used when rehydrating an attached container diff --git a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md index 1ee912869d4b..49cc1e22112c 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md @@ -88,6 +88,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; reSubmit(type: DataStoreMessageType, content: any, localOpMetadata: unknown, squash: boolean): void; + reSubmitMessages(type: DataStoreMessageType, collection: IRuntimeResubmitMessageCollection): void; rollback?(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; // (undocumented) get rootRoutingContext(): this; diff --git a/packages/runtime/datastore/package.json b/packages/runtime/datastore/package.json index 565c0f23dd5c..bfa7f3373363 100644 --- a/packages/runtime/datastore/package.json +++ b/packages/runtime/datastore/package.json @@ -157,7 +157,11 @@ "typescript": "~5.4.5" }, "typeValidation": { - "broken": {}, + "broken": { + "Class_FluidDataStoreRuntime": { + "forwardCompat": false + } + }, "entrypoint": "legacy" } } diff --git a/packages/runtime/datastore/src/channelContext.ts b/packages/runtime/datastore/src/channelContext.ts index 900e00f1ef25..2a1ca2c3bf12 100644 --- a/packages/runtime/datastore/src/channelContext.ts +++ b/packages/runtime/datastore/src/channelContext.ts @@ -19,6 +19,7 @@ import type { IFluidDataStoreContext, ISummarizeResult, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "@fluidframework/runtime-definitions/internal"; import { addBlobToSummary } from "@fluidframework/runtime-utils/internal"; @@ -53,6 +54,12 @@ export interface IChannelContext { reSubmit(content: unknown, localOpMetadata: unknown, squash: boolean): void; + /** + * Resubmit a bunch of contiguous messages for this channel context in one call. + * @param collection - The bunch of messages to resubmit, with a shared `squash` flag. + */ + reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void; + applyStashedOp(content: unknown): unknown; rollback(message: unknown, localOpMetadata: unknown): void; diff --git a/packages/runtime/datastore/src/channelDeltaConnection.ts b/packages/runtime/datastore/src/channelDeltaConnection.ts index 433f613af8ec..2debcb539611 100644 --- a/packages/runtime/datastore/src/channelDeltaConnection.ts +++ b/packages/runtime/datastore/src/channelDeltaConnection.ts @@ -11,6 +11,8 @@ import type { import type { IRuntimeMessageCollection, IRuntimeMessagesContent, + IRuntimeResubmitMessage, + IRuntimeResubmitMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { DataProcessingError } from "@fluidframework/telemetry-utils/internal"; @@ -123,6 +125,39 @@ export class ChannelDeltaConnection implements IDeltaConnection { ); } + public reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void { + // Fan out any stashed-op metadata pairs into individual entries before dispatching the bunch, + // mirroring how processMessages expands messagesContent. + const flattened: IRuntimeResubmitMessage[] = []; + for (const { contents, localOpMetadata } of collection.messages) { + processWithStashedOpMetadataHandling( + contents, + localOpMetadata, + (expandedContents, expandedMetadata) => { + flattened.push({ + contents: expandedContents, + localOpMetadata: expandedMetadata, + }); + }, + ); + } + + const expandedCollection: IRuntimeResubmitMessageCollection = { + squash: collection.squash, + messages: flattened, + }; + + if (this.handler.reSubmitMessages !== undefined) { + this.handler.reSubmitMessages(expandedCollection); + return; + } + + // Fallback for handlers that haven't opted in to the bunched form. + for (const { contents, localOpMetadata } of expandedCollection.messages) { + this.handler.reSubmit(contents, localOpMetadata, expandedCollection.squash); + } + } + public rollback(content: unknown, localOpMetadata: unknown): void { if (this.handler.rollback === undefined) { throw new Error("Handler doesn't support rollback"); diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index 0c34a36c6f3e..0026705629f4 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -59,6 +59,8 @@ import { type IInboundSignalMessage, type IRuntimeMessageCollection, type IRuntimeMessagesContent, + type IRuntimeResubmitMessage, + type IRuntimeResubmitMessageCollection, notifiesReadOnlyState, encodeHandlesInContainerRuntime, type IFluidDataStorePolicies, @@ -1412,6 +1414,74 @@ export class FluidDataStoreRuntime } } + /** + * Resubmit a bunch of messages of the same type. For ChannelOp bunches, contiguous entries + * targeting the same channel address are forwarded to that channel's context as a single + * bunched resubmit; address changes within the bunch flush the current sub-bunch. + * + * @privateRemarks + * `type` parameter's type of `DataStoreMessageType` is a covariance exception over `string` + * that `IFluidDataStoreChannel` specifies. See the existing {@link FluidDataStoreRuntime.reSubmit} + * for context. + */ + public reSubmitMessages( + type: DataStoreMessageType, + collection: IRuntimeResubmitMessageCollection, + ): void { + this.verifyNotClosed(); + + const { squash, messages } = collection; + // The ops being resubmitted will not be submitted as-is, so decrement the count. The + // downstream resubmit calls below may resubmit ops (which will re-increment) or not. + this.pendingOpCount.value -= messages.length; + + switch (type) { + case DataStoreMessageType.ChannelOp: { + // Bunch contiguous entries by channel address and dispatch each sub-bunch in one call. + let currentAddress: string | undefined; + let currentMessages: IRuntimeResubmitMessage[] = []; + + const flushCurrent = (): void => { + if (currentAddress === undefined || currentMessages.length === 0) { + return; + } + const channelContext = this.contexts.get(currentAddress); + assert(!!channelContext, "There should be a channel context for the op"); + channelContext.reSubmitMessages({ squash, messages: currentMessages }); + currentMessages = []; + }; + + for (const message of messages) { + const envelope = message.contents as IEnvelope; + if (currentAddress !== envelope.address) { + flushCurrent(); + currentAddress = envelope.address; + } + currentMessages.push({ + contents: envelope.contents, + localOpMetadata: message.localOpMetadata, + }); + } + flushCurrent(); + break; + } + case DataStoreMessageType.Attach: { + // Attach messages aren't meaningfully bunchable — resubmit each individually. + for (const message of messages) { + this.submit( + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- mirrors `reSubmit`'s any-typed content + { type, content: message.contents as IAttachMessage }, + message.localOpMetadata, + ); + } + break; + } + default: { + unreachableCase(type); + } + } + } + /** * Revert a local op. * @param content - The content of the original message. diff --git a/packages/runtime/datastore/src/localChannelContext.ts b/packages/runtime/datastore/src/localChannelContext.ts index 641a96b49a65..9b322afb72a7 100644 --- a/packages/runtime/datastore/src/localChannelContext.ts +++ b/packages/runtime/datastore/src/localChannelContext.ts @@ -17,6 +17,7 @@ import type { ISummarizeResult, IPendingMessagesState, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "@fluidframework/runtime-definitions/internal"; import { @@ -115,6 +116,15 @@ export abstract class LocalChannelContextBase implements IChannelContext { ); this.services.value.deltaConnection.reSubmit(content, localOpMetadata, squash); } + + public reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void { + assert(this.isLoaded, "Channel should be loaded to resubmit ops"); + assert( + this.globallyVisible, + "Local channel must be globally visible when resubmitting op", + ); + this.services.value.deltaConnection.reSubmitMessages(collection); + } public rollback(content: unknown, localOpMetadata: unknown): void { assert(this.isLoaded, 0x2ee /* "Channel should be loaded to rollback ops" */); assert( diff --git a/packages/runtime/datastore/src/remoteChannelContext.ts b/packages/runtime/datastore/src/remoteChannelContext.ts index 3b4c1931c82f..5a3948d710c1 100644 --- a/packages/runtime/datastore/src/remoteChannelContext.ts +++ b/packages/runtime/datastore/src/remoteChannelContext.ts @@ -21,6 +21,7 @@ import type { ISummarizerNodeWithGC, IPendingMessagesState, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "@fluidframework/runtime-definitions/internal"; import { @@ -203,6 +204,11 @@ export class RemoteChannelContext implements IChannelContext { this.services.deltaConnection.reSubmit(content, localOpMetadata, squash); } + public reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void { + assert(this.isLoaded, "Remote channel must be loaded when resubmitting ops"); + this.services.deltaConnection.reSubmitMessages(collection); + } + public rollback(content: unknown, localOpMetadata: unknown): void { assert(this.isLoaded, 0x2f0 /* "Remote channel must be loaded when rolling back op" */); diff --git a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts index b0e5b7c27b93..a71b284584f7 100644 --- a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts +++ b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts @@ -24,6 +24,7 @@ declare type MakeUnusedImportErrorsGoAway = TypeOnly | MinimalType | Fu * typeValidation.broken: * "Class_FluidDataStoreRuntime": {"forwardCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type old_as_current_for_Class_FluidDataStoreRuntime = requireAssignableTo, TypeOnly> /* diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index 9f18e360b28c..bd9179ce73f0 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -146,6 +146,7 @@ export interface IFluidDataStoreChannel extends IDisposable { // (undocumented) request(request: IRequest): Promise; reSubmit(type: string, content: any, localOpMetadata: unknown, squash: boolean): void; + reSubmitMessages?(type: string, collection: IRuntimeResubmitMessageCollection): void; rollback?(type: string, content: any, localOpMetadata: unknown): void; // (undocumented) setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void; @@ -297,6 +298,18 @@ export interface IRuntimeMessagesContent { readonly localOpMetadata: unknown; } +// @beta @sealed @legacy +export interface IRuntimeResubmitMessage { + readonly contents: unknown; + readonly localOpMetadata: unknown; +} + +// @beta @sealed @legacy +export interface IRuntimeResubmitMessageCollection { + readonly messages: readonly IRuntimeResubmitMessage[]; + readonly squash: boolean; +} + // @beta @legacy export interface IRuntimeStorageService { readBlob(id: string): Promise; diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md index 75f077ccfa54..42d63f894569 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md @@ -139,6 +139,7 @@ export interface IFluidDataStoreChannel extends IDisposable { // (undocumented) request(request: IRequest): Promise; reSubmit(type: string, content: any, localOpMetadata: unknown, squash: boolean): void; + reSubmitMessages?(type: string, collection: IRuntimeResubmitMessageCollection): void; rollback?(type: string, content: any, localOpMetadata: unknown): void; // (undocumented) setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void; @@ -290,6 +291,18 @@ export interface IRuntimeMessagesContent { readonly localOpMetadata: unknown; } +// @beta @sealed @legacy +export interface IRuntimeResubmitMessage { + readonly contents: unknown; + readonly localOpMetadata: unknown; +} + +// @beta @sealed @legacy +export interface IRuntimeResubmitMessageCollection { + readonly messages: readonly IRuntimeResubmitMessage[]; + readonly squash: boolean; +} + // @beta @legacy export interface IRuntimeStorageService { readBlob(id: string): Promise; diff --git a/packages/runtime/runtime-definitions/src/dataStoreContext.ts b/packages/runtime/runtime-definitions/src/dataStoreContext.ts index 4e514b16b38d..87bf119f379a 100644 --- a/packages/runtime/runtime-definitions/src/dataStoreContext.ts +++ b/packages/runtime/runtime-definitions/src/dataStoreContext.ts @@ -41,6 +41,7 @@ import type { import type { IInboundSignalMessage, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "./protocol.js"; import type { @@ -480,6 +481,21 @@ export interface IFluidDataStoreChannel extends IDisposable { // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change reSubmit(type: string, content: any, localOpMetadata: unknown, squash: boolean): void; + /** + * Ask the DDS to resubmit a bunch of contiguous messages of the same type. + * @remarks + * The bunched form of {@link IFluidDataStoreChannel.reSubmit}, mirroring the inbound + * {@link IFluidDataStoreChannel.processMessages} shape so DDSes that benefit from + * processing a contiguous run together can do so on resubmit. + * + * The default implementation provided by the runtime simply loops over + * `collection.messages` calling the single-op {@link IFluidDataStoreChannel.reSubmit} + * path; implementers may override to take advantage of the bunch. + * @param type - The type shared by all messages in the collection. + * @param collection - The bunch of messages to resubmit, with a shared `squash` flag. + */ + reSubmitMessages?(type: string, collection: IRuntimeResubmitMessageCollection): void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change applyStashedOp(content: any): Promise; diff --git a/packages/runtime/runtime-definitions/src/index.ts b/packages/runtime/runtime-definitions/src/index.ts index c5a50343eff5..bbc062a3cb2c 100644 --- a/packages/runtime/runtime-definitions/src/index.ts +++ b/packages/runtime/runtime-definitions/src/index.ts @@ -64,6 +64,8 @@ export type { InboundAttachMessage, IRuntimeMessageCollection, IRuntimeMessagesContent, + IRuntimeResubmitMessage, + IRuntimeResubmitMessageCollection, ISequencedMessageEnvelope, IRuntimeStorageService, } from "./protocol.js"; diff --git a/packages/runtime/runtime-definitions/src/protocol.ts b/packages/runtime/runtime-definitions/src/protocol.ts index d2159b428d2f..615044d3f5d3 100644 --- a/packages/runtime/runtime-definitions/src/protocol.ts +++ b/packages/runtime/runtime-definitions/src/protocol.ts @@ -141,6 +141,44 @@ export interface IRuntimeMessageCollection { readonly messagesContent: readonly IRuntimeMessagesContent[]; } +/** + * A single message to resubmit, as part of an {@link IRuntimeResubmitMessageCollection}. + * @legacy @beta + * @sealed + */ +export interface IRuntimeResubmitMessage { + /** + * The contents of the original message that was submitted. + */ + readonly contents: unknown; + /** + * The local metadata associated with the original message that was submitted. + */ + readonly localOpMetadata: unknown; +} + +/** + * A collection of messages to be resubmitted together as a "bunch". + * @remarks + * All messages in a resubmit collection share the same target — that is, they are + * for the same DDS — and share a single `squash` setting. This mirrors the inbound + * "bunch" shape of {@link IRuntimeMessageCollection} for the outbound resubmit path, + * allowing DDSes to handle a contiguous run of resubmits in one call. + * @legacy @beta + * @sealed + */ +export interface IRuntimeResubmitMessageCollection { + /** + * If true, the DDS should avoid resubmitting any "unnecessary intermediate state" created + * by these messages. Applies uniformly to every message in the collection. + */ + readonly squash: boolean; + /** + * The messages to resubmit, in original submission order. + */ + readonly messages: readonly IRuntimeResubmitMessage[]; +} + /** * Outgoing {@link IFluidDataStoreChannel} message structures. * @internal From fa31a158ed884be59ecca79c4d05fde9a40686cd Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 27 May 2026 16:31:44 -0700 Subject: [PATCH 2/2] refactor(runtime-utils): extract forEachContiguousBunch helper Three call sites (processChannelMessages, reSubmitContainerMessages on ChannelCollection, and FluidDataStoreRuntime.reSubmitMessages) had near-identical "run-length encode contiguous same-key items, flush each run to a sink" loops. Extract the bunching skeleton into a single @internal helper in runtime-utils that takes keyOf / valueOf / onBunch and an optional keysEqual predicate (defaulting to Object.is). Per-message side effects (delete checks, GC bookkeeping, detectOutboundReferences) stay at the call site and run before items enter the helper, preserving the existing semantics. Unit tests cover empty input, singletons, contiguous runs, alternating keys, structured keys, and the default Object.is behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/channelCollection.ts | 142 ++++++++---------- .../runtime/datastore/src/dataStoreRuntime.ts | 37 ++--- .../runtime/runtime-utils/src/bunching.ts | 57 +++++++ packages/runtime/runtime-utils/src/index.ts | 1 + .../runtime-utils/src/test/bunching.spec.ts | 142 ++++++++++++++++++ 5 files changed, 274 insertions(+), 105 deletions(-) create mode 100644 packages/runtime/runtime-utils/src/bunching.ts create mode 100644 packages/runtime/runtime-utils/src/test/bunching.spec.ts diff --git a/packages/runtime/container-runtime/src/channelCollection.ts b/packages/runtime/container-runtime/src/channelCollection.ts index 9649090ab054..8269547d0ccd 100644 --- a/packages/runtime/container-runtime/src/channelCollection.ts +++ b/packages/runtime/container-runtime/src/channelCollection.ts @@ -44,7 +44,6 @@ import type { InboundAttachMessage, IRuntimeMessageCollection, IRuntimeMessagesContent, - IRuntimeResubmitMessage, ISummarizeResult, ISummaryTreeWithStats, ITelemetryContext, @@ -67,6 +66,7 @@ import { create404Response, createResponseError, encodeCompactIdToString, + forEachContiguousBunch, isSerializedHandle, processAttachMessageGCData, responseToException, @@ -844,48 +844,33 @@ export class ChannelCollection }[], squash: boolean, ): void => { - let currentAddress: string | undefined; - let currentType: string | undefined; - let currentMessages: IRuntimeResubmitMessage[] = []; - - const flushCurrent = (): void => { - if ( - currentAddress === undefined || - currentType === undefined || - currentMessages.length === 0 - ) { - return; - } - const context = this.contexts.get(currentAddress); - if ( - this.checkAndLogIfDeleted( - currentAddress, - context, - "Changed", - "reSubmitContainerMessages", - ) - ) { - throw new DataCorruptionError("Context is deleted!", { - callSite: "reSubmitContainerMessages", - ...tagCodeArtifacts({ id: currentAddress }), - }); - } - assert(!!context, "There should be a store context for the op"); - context.reSubmitMessages(currentType, { squash, messages: currentMessages }); - currentMessages = []; - }; - - for (const { envelope, localOpMetadata } of entries) { - const { address } = envelope; - const { type: ddsType, content } = envelope.contents; - if (currentAddress !== address || currentType !== ddsType) { - flushCurrent(); - currentAddress = address; - currentType = ddsType; - } - currentMessages.push({ contents: content, localOpMetadata }); - } - flushCurrent(); + forEachContiguousBunch( + entries, + (e) => ({ address: e.envelope.address, type: e.envelope.contents.type }), + (e) => ({ + contents: e.envelope.contents.content, + localOpMetadata: e.localOpMetadata, + }), + (key, messages) => { + const context = this.contexts.get(key.address); + if ( + this.checkAndLogIfDeleted( + key.address, + context, + "Changed", + "reSubmitContainerMessages", + ) + ) { + throw new DataCorruptionError("Context is deleted!", { + callSite: "reSubmitContainerMessages", + ...tagCodeArtifacts({ id: key.address }), + }); + } + assert(!!context, "There should be a store context for the op"); + context.reSubmitMessages(key.type, { squash, messages }); + }, + (a, b) => a.address === b.address && a.type === b.type, + ); }; public readonly rollbackDataStoreOp = ( @@ -1013,31 +998,17 @@ export class ChannelCollection */ private processChannelMessages(messageCollection: IRuntimeMessageCollection): void { const { envelope, messagesContent, local } = messageCollection; - let currentMessageState: { address: string; type: string } | undefined; - let currentMessagesContent: IRuntimeMessagesContent[] = []; - - // Helper that sends the current bunch of messages to the data store. It validates that the data stores exists. - const sendBunchedMessages = (): void => { - // Current message state will be undefined for the first message in the list. - if (currentMessageState === undefined) { - return; - } - const currentContext = this.contexts.get(currentMessageState.address); - assert(!!currentContext, 0xa66 /* Context not found */); - currentContext.processMessages({ - envelope: { ...envelope, type: currentMessageState.type }, - messagesContent: currentMessagesContent, - local, - }); - currentMessagesContent = []; - }; + // First pass: per-message validation, GC bookkeeping, and shape transform. Deleted-context + // messages are dropped here so they never reach a bunch. Non-deleted messages are collected + // with their (address, ddsType) bunch key for the second pass. + interface ChannelMessageItem { + address: string; + type: string; + content: IRuntimeMessagesContent; + } + const items: ChannelMessageItem[] = []; - /** - * Bunch contiguous messages for the same data store and send them together. - * This is an optimization mainly for DDSes, where it can process a bunch of ops together. DDSes - * like merge tree or shared tree can process ops more efficiently when they are bunched together. - */ for (const { contents, ...restOfMessagesContent } of messagesContent) { const contentsEnvelope = contents as IEnvelope; const address = contentsEnvelope.address; @@ -1067,18 +1038,6 @@ export class ChannelCollection } const { type: contextType, content: contextContents } = contentsEnvelope.contents; - // If the address or type of the message changes while processing the message, send the current bunch. - if ( - currentMessageState?.address !== address || - currentMessageState?.type !== contextType - ) { - sendBunchedMessages(); - } - currentMessagesContent.push({ - contents: contextContents, - ...restOfMessagesContent, - }); - currentMessageState = { address, type: contextType }; // Notify that a GC node for the data store changed. This is used to detect if a deleted data store is // being used. @@ -1092,11 +1051,32 @@ export class ChannelCollection detectOutboundReferences(address, contextContents, (fromPath: string, toPath: string) => this.parentContext.addedGCOutboundRoute(fromPath, toPath, envelope.timestamp), ); + + items.push({ + address, + type: contextType, + content: { contents: contextContents, ...restOfMessagesContent }, + }); } - // Process the last bunch of messages, if any. Note that there may not be any messages in case all of them are - // ignored because the data store is deleted. - sendBunchedMessages(); + // Bunch contiguous messages for the same (address, ddsType) and send them together. This is an + // optimization mainly for DDSes, where merge-tree / shared-tree can process a bunch of ops together + // more efficiently than one at a time. + forEachContiguousBunch( + items, + (item) => ({ address: item.address, type: item.type }), + (item) => item.content, + (key, bunch) => { + const context = this.contexts.get(key.address); + assert(!!context, 0xa66 /* Context not found */); + context.processMessages({ + envelope: { ...envelope, type: key.type }, + messagesContent: bunch, + local, + }); + }, + (a, b) => a.address === b.address && a.type === b.type, + ); } private async getDataStore( diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index 0026705629f4..1e3e94e8af16 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -78,6 +78,7 @@ import { create404Response, createResponseError, exceptionToResponse, + forEachContiguousBunch, generateHandleContextPath, processAttachMessageGCData, dataStoreLoadTelemetryProps, @@ -1438,31 +1439,19 @@ export class FluidDataStoreRuntime switch (type) { case DataStoreMessageType.ChannelOp: { // Bunch contiguous entries by channel address and dispatch each sub-bunch in one call. - let currentAddress: string | undefined; - let currentMessages: IRuntimeResubmitMessage[] = []; - - const flushCurrent = (): void => { - if (currentAddress === undefined || currentMessages.length === 0) { - return; - } - const channelContext = this.contexts.get(currentAddress); - assert(!!channelContext, "There should be a channel context for the op"); - channelContext.reSubmitMessages({ squash, messages: currentMessages }); - currentMessages = []; - }; - - for (const message of messages) { - const envelope = message.contents as IEnvelope; - if (currentAddress !== envelope.address) { - flushCurrent(); - currentAddress = envelope.address; - } - currentMessages.push({ - contents: envelope.contents, + forEachContiguousBunch( + messages, + (message) => (message.contents as IEnvelope).address, + (message): IRuntimeResubmitMessage => ({ + contents: (message.contents as IEnvelope).contents, localOpMetadata: message.localOpMetadata, - }); - } - flushCurrent(); + }), + (address, bunch) => { + const channelContext = this.contexts.get(address); + assert(!!channelContext, "There should be a channel context for the op"); + channelContext.reSubmitMessages({ squash, messages: bunch }); + }, + ); break; } case DataStoreMessageType.Attach: { diff --git a/packages/runtime/runtime-utils/src/bunching.ts b/packages/runtime/runtime-utils/src/bunching.ts new file mode 100644 index 000000000000..342163ca413c --- /dev/null +++ b/packages/runtime/runtime-utils/src/bunching.ts @@ -0,0 +1,57 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Walk a sequence of items, grouping each maximal run of contiguous items with the same key + * into a "bunch" and invoking `onBunch` once per run. + * + * @remarks + * This is the core pattern used by the Fluid runtime to dispatch operations to a data store + * or DDS in bunches: inbound op processing (`processMessages`) and outbound resubmit + * (`reSubmitMessages`) both walk a contiguous sequence of operations and dispatch each + * maximal same-target run as a single call to the lower layer. + * + * The helper preserves input order. Each call to `onBunch` receives the key it was bunched + * by along with the list of transformed bunch items in original order. Side effects that need + * to happen per source item (e.g. delete checks, GC node updates) should be performed by the + * caller around the call to this helper, not inside `valueOf` — `valueOf` is purely a shape + * transformation from source item to bunch item. + * + * @param items - The source items to walk. + * @param keyOf - Extracts the bunching key from a source item. Items with equal keys + * (according to `keysEqual`) that appear contiguously are bunched together. + * @param valueOf - Transforms a source item into its bunch-element form. + * @param onBunch - Invoked once per bunch with the key and the bunch items. + * @param keysEqual - Equality predicate for keys. Defaults to `Object.is`. Provide a custom + * predicate to bunch by structured / composite keys. + * + * @internal + */ +export function forEachContiguousBunch( + items: Iterable, + keyOf: (item: TItem) => TKey, + valueOf: (item: TItem) => TBunchItem, + onBunch: (key: TKey, bunch: TBunchItem[]) => void, + keysEqual: (a: TKey, b: TKey) => boolean = Object.is, +): void { + let currentKey: TKey | undefined; + let hasCurrentKey = false; + let bunch: TBunchItem[] = []; + + for (const item of items) { + const key = keyOf(item); + if (hasCurrentKey && !keysEqual(currentKey as TKey, key)) { + onBunch(currentKey as TKey, bunch); + bunch = []; + } + currentKey = key; + hasCurrentKey = true; + bunch.push(valueOf(item)); + } + + if (hasCurrentKey && bunch.length > 0) { + onBunch(currentKey as TKey, bunch); + } +} diff --git a/packages/runtime/runtime-utils/src/index.ts b/packages/runtime/runtime-utils/src/index.ts index 289587c98aa3..2d6571fc8f37 100644 --- a/packages/runtime/runtime-utils/src/index.ts +++ b/packages/runtime/runtime-utils/src/index.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. */ +export { forEachContiguousBunch } from "./bunching.js"; export { generateHandleContextPath } from "./dataStoreHandleContextUtils.js"; export { create404Response, diff --git a/packages/runtime/runtime-utils/src/test/bunching.spec.ts b/packages/runtime/runtime-utils/src/test/bunching.spec.ts new file mode 100644 index 000000000000..b40b8887d7fc --- /dev/null +++ b/packages/runtime/runtime-utils/src/test/bunching.spec.ts @@ -0,0 +1,142 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { forEachContiguousBunch } from "../bunching.js"; + +describe("forEachContiguousBunch", () => { + it("emits nothing for an empty iterable", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch<{ k: string; v: number }, string, number>( + [], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, []); + }); + + it("emits a single bunch for a single item", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [{ k: "a", v: 1 }], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [{ key: "a", bunch: [1] }]); + }); + + it("collapses contiguous same-key items into one bunch", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "a", v: 2 }, + { k: "a", v: 3 }, + ], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [{ key: "a", bunch: [1, 2, 3] }]); + }); + + it("splits when the key changes and preserves order", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "a", v: 2 }, + { k: "b", v: 3 }, + { k: "a", v: 4 }, + { k: "a", v: 5 }, + ], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [ + { key: "a", bunch: [1, 2] }, + { key: "b", bunch: [3] }, + { key: "a", bunch: [4, 5] }, + ]); + }); + + it("alternating keys produce singleton bunches", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "b", v: 2 }, + { k: "a", v: 3 }, + { k: "b", v: 4 }, + ], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [ + { key: "a", bunch: [1] }, + { key: "b", bunch: [2] }, + { key: "a", bunch: [3] }, + { key: "b", bunch: [4] }, + ]); + }); + + it("uses a structured-key equality predicate when provided", () => { + const bunches: { key: { x: string; y: string }; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { x: "a", y: "1", v: 10 }, + { x: "a", y: "1", v: 11 }, + { x: "a", y: "2", v: 12 }, + { x: "a", y: "2", v: 13 }, + { x: "a", y: "1", v: 14 }, + ], + (i) => ({ x: i.x, y: i.y }), + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + (a, b) => a.x === b.x && a.y === b.y, + ); + assert.deepStrictEqual(bunches, [ + { key: { x: "a", y: "1" }, bunch: [10, 11] }, + { key: { x: "a", y: "2" }, bunch: [12, 13] }, + { key: { x: "a", y: "1" }, bunch: [14] }, + ]); + }); + + it("treats every item as a distinct key with reference-equality on object keys", () => { + // Default keysEqual is Object.is — two distinct object instances never match, + // so each item becomes its own bunch even when the keys are structurally identical. + const bunches: number[][] = []; + forEachContiguousBunch( + [ + { k: { a: 1 }, v: 1 }, + { k: { a: 1 }, v: 2 }, + ], + (i) => i.k, + (i) => i.v, + (_key, bunch) => bunches.push(bunch), + ); + assert.deepStrictEqual(bunches, [[1], [2]]); + }); + + it("supports valueOf that transforms items", () => { + const bunches: string[][] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "a", v: 2 }, + { k: "b", v: 3 }, + ], + (i) => i.k, + (i) => `v${i.v}`, + (_key, bunch) => bunches.push(bunch), + ); + assert.deepStrictEqual(bunches, [["v1", "v2"], ["v3"]]); + }); +});