Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions packages/runtime/datastore/src/dataStoreRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,9 @@ export class FluidDataStoreRuntime
logger: dataStoreContext.baseLogger,
namespace: "FluidDataStoreRuntime",
properties: {
all: { dataStoreId: uuid(), dataStoreVersion: pkgVersion },
error: {
all: {
dataStoreVersion: pkgVersion,
...dataStoreLoadTelemetryProps(dataStoreContext),
inStagingMode: () => this.inStagingMode,
isDirty: () => this.isDirty,
},
Expand Down Expand Up @@ -1574,8 +1575,9 @@ export class FluidDataStoreRuntime
...tagCodeArtifacts({
channelType,
channelId,
fluidDataStoreId: this.id,
fluidDataStorePackagePath: this.dataStoreContext.packagePath.join("/"),
// Properties renamed in 2.103.0 (present via logger common props):
// fluidDataStoreId -> dataStoreId
// fluidDataStorePackagePath -> dataStorePackagePath
}),
stack: generateStack(30),
});
Expand Down
32 changes: 28 additions & 4 deletions packages/runtime/datastore/src/localChannelContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import type {
IRuntimeMessageCollection,
IRuntimeStorageService,
} from "@fluidframework/runtime-definitions/internal";
import { dataStoreLoadTelemetryProps } from "@fluidframework/runtime-utils/internal";
import {
type TelemetryLoggerExt,
DataProcessingError,
createChildLogger,
tagCodeArtifacts,
} from "@fluidframework/telemetry-utils/internal";

import {
Expand Down Expand Up @@ -221,6 +224,19 @@ export class RehydratedLocalChannelContext extends LocalChannelContextBase {
private readonly snapshotTree: ISnapshotTree,
extraBlob?: Map<string, ArrayBufferLike>,
) {
// `channelType` is not known until the LazyPromise body runs `loadChannelFactoryAndAttributes`.
// Pass a getter to `tagCodeArtifacts` so events log the type as soon as it's available.
let channelType: string | undefined;

const subLogger = createChildLogger({
logger,
namespace: "RehydratedLocalChannelContext",
properties: {
all: {
...tagCodeArtifacts({ channelId: id, channelType: () => channelType }),
},
},
});
super(
id,
runtime,
Expand All @@ -241,7 +257,7 @@ export class RehydratedLocalChannelContext extends LocalChannelContextBase {
this.dirtyFn,
() => this.isGloballyVisible,
storageService,
logger,
subLogger,
clonedSnapshotTree,
blobMap,
);
Expand All @@ -254,12 +270,13 @@ export class RehydratedLocalChannelContext extends LocalChannelContextBase {
this.id,
registry,
);
channelType = attributes.type;
const channel = await loadChannel(
runtime,
attributes,
factory,
this.services.value,
logger,
subLogger,
this.id,
);
// Send all pending messages to the channel
Expand All @@ -268,11 +285,18 @@ export class RehydratedLocalChannelContext extends LocalChannelContextBase {
}
return channel;
} catch (error) {
throw DataProcessingError.wrapIfUnrecognized(
const errorWrapped = DataProcessingError.wrapIfUnrecognized(
error,
"rehydratedLocalChannelContextFailedToLoadChannel",
undefined,
);
errorWrapped.addTelemetryProperties({
...dataStoreLoadTelemetryProps(dataStoreContext),
...tagCodeArtifacts({ channelId: id, channelType }),
});

// "Realize" is another name for instantiating the channel for a context
subLogger.sendErrorEvent({ eventName: "RealizeError" }, errorWrapped);
throw errorWrapped;
}
}),
);
Expand Down
99 changes: 65 additions & 34 deletions packages/runtime/datastore/src/remoteChannelContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import type {
IRuntimeMessageCollection,
IRuntimeStorageService,
} from "@fluidframework/runtime-definitions/internal";
import { dataStoreLoadTelemetryProps } from "@fluidframework/runtime-utils/internal";
import {
DataProcessingError,
type TelemetryLoggerExt,
ThresholdCounter,
createChildLogger,
tagCodeArtifacts,
} from "@fluidframework/telemetry-utils/internal";

import {
Expand Down Expand Up @@ -71,9 +74,21 @@ export class RemoteChannelContext implements IChannelContext {
) {
assert(!this.id.includes("/"), 0x310 /* Channel context ID cannot contain slashes */);

// `channelType` is not known until the LazyPromise body runs `loadChannelFactoryAndAttributes`.
// Pass a getter to `tagCodeArtifacts` so events log the type as soon as it's available.
let channelType: string | undefined;

this.subLogger = createChildLogger({
logger: runtime.logger,
namespace: "RemoteChannelContext",
properties: {
all: {
...tagCodeArtifacts({
channelId: this.id,
channelType: () => channelType,
}),
},
},
});

this.services = createChannelServiceEndpoints(
Expand All @@ -88,44 +103,60 @@ export class RemoteChannelContext implements IChannelContext {
);

this.channelP = new LazyPromise<IChannel>(async () => {
const { attributes, factory } = await loadChannelFactoryAndAttributes(
dataStoreContext,
this.services,
this.id,
registry,
attachMessageType,
);
try {
const { attributes, factory } = await loadChannelFactoryAndAttributes(
dataStoreContext,
this.services,
this.id,
registry,
attachMessageType,
);
channelType = attributes.type;

const channel = await loadChannel(
runtime,
attributes,
factory,
this.services,
this.subLogger,
this.id,
);
const channel = await loadChannel(
runtime,
attributes,
factory,
this.services,
this.subLogger,
this.id,
);

assert(
this.pendingMessagesState !== undefined,
0xa6c /* pending messages state is undefined */,
);
for (const messageCollection of this.pendingMessagesState.messageCollections) {
this.services.deltaConnection.processMessages(messageCollection);
}
this.thresholdOpsCounter.send(
"ProcessPendingOps",
this.pendingMessagesState.pendingCount,
);
assert(
this.pendingMessagesState !== undefined,
0xa6c /* pending messages state is undefined */,
);
for (const messageCollection of this.pendingMessagesState.messageCollections) {
this.services.deltaConnection.processMessages(messageCollection);
}
this.thresholdOpsCounter.send(
"ProcessPendingOps",
this.pendingMessagesState.pendingCount,
);

// Commit changes.
this.channel = channel;
this.pendingMessagesState = undefined;
this.isLoaded = true;
// Commit changes.
this.channel = channel;
this.pendingMessagesState = undefined;
this.isLoaded = true;

// Because have some await between we created the service and here, the connection state might have changed
// and we don't propagate the connection state when we are not loaded. So we have to set it again here.
this.services.deltaConnection.setConnectionState(dataStoreContext.connected);
return this.channel;
// Because have some await between we created the service and here, the connection state might have changed
// and we don't propagate the connection state when we are not loaded. So we have to set it again here.
this.services.deltaConnection.setConnectionState(dataStoreContext.connected);
return this.channel;
} catch (error) {
const errorWrapped = DataProcessingError.wrapIfUnrecognized(
error,
"remoteChannelContextFailedToLoadChannel",
);
errorWrapped.addTelemetryProperties({
...dataStoreLoadTelemetryProps(dataStoreContext),
...tagCodeArtifacts({ channelId: id, channelType }),
});

// "Realize" is another name for instantiating the channel for a context
this.subLogger.sendErrorEvent({ eventName: "RealizeError" }, errorWrapped);
throw errorWrapped;
}
});

this.summarizerNode = createSummarizerNode(
Expand Down
65 changes: 64 additions & 1 deletion packages/runtime/datastore/src/test/localChannelContext.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@

import { strict as assert } from "node:assert";

import { ContainerErrorTypes } from "@fluidframework/container-definitions/internal";
import type { IErrorBase } from "@fluidframework/core-interfaces";
import type { IChannel } from "@fluidframework/datastore-definitions/internal";
import type { ISnapshotTree } from "@fluidframework/driver-definitions/internal";
import type { IFluidDataStoreContext } from "@fluidframework/runtime-definitions/internal";
import { extractTelemetryLoggerExt } from "@fluidframework/telemetry-utils/internal";
import {
extractTelemetryLoggerExt,
isFluidError,
MockLogger,
TelemetryDataTag,
} from "@fluidframework/telemetry-utils/internal";
import {
MockFluidDataStoreContext,
validateAssertionError,
Expand Down Expand Up @@ -83,4 +90,60 @@ describe("LocalChannelContext Tests", () => {
"Expected exception was not thrown",
);
});

it("RehydratedLocalChannelContext first await on getChannel() logs ChannelLoadFailure with tagged props when load fails", async () => {
const channelId = "ddsId";
const mockLogger = new MockLogger();
const contextWithMockLogger = new MockFluidDataStoreContext(
"testDataStoreId",
false,
mockLogger.toTelemetryLogger(),
);
contextWithMockLogger.packagePath = ["pkgA", "pkgB"];
const dataStoreRuntime = loadRuntime(contextWithMockLogger, sharedObjectRegistry);

// Registry returns undefined so loadChannelFactoryAndAttributes throws
// inside the LazyPromise body. With an empty snapshot tree and no
// attachMessageType, this throws `channelTypeNotAvailable`.
const failingRegistry: ISharedObjectRegistry = {
get: () => undefined,
};

const rehydratedChannelContext = new RehydratedLocalChannelContext(
channelId,
failingRegistry,
dataStoreRuntime,
contextWithMockLogger,
contextWithMockLogger.storage,
extractTelemetryLoggerExt(contextWithMockLogger.baseLogger),
() => {},
() => {},
{ trees: {}, blobs: {} } as unknown as ISnapshotTree,
);

await assert.rejects(
async () => rehydratedChannelContext.getChannel(),
(error: IErrorBase) => {
assert.strictEqual(
error.errorType,
ContainerErrorTypes.dataProcessingError,
"thrown error should be a DataProcessingError",
);
assert(isFluidError(error), "thrown error should be a Fluid error");
return true;
},
);

mockLogger.assertMatchAny(
[
{
eventName: "RehydratedLocalChannelContext:RealizeError",
fluidDataStoreId: { value: "testDataStoreId", tag: TelemetryDataTag.CodeArtifact },
fullPackageName: { value: "pkgA/pkgB", tag: TelemetryDataTag.CodeArtifact },
channelId: { value: channelId, tag: TelemetryDataTag.CodeArtifact },
},
],
"Expected one RealizeError event with tagged data-store and channel props",
);
});
});
Loading
Loading