From abdaa1879cc7a4a9f0a626bc3afa62fb5aa673c3 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Wed, 3 Jun 2026 09:29:45 -0700 Subject: [PATCH 1/3] canonicalize datastream wire payloads to camelCase via Fern serializers --- src/datastream/datastream-client.ts | 56 ++++++- src/datastream/merge.ts | 76 ++++----- .../unit/datastream/datastream-client.test.ts | 146 +++++++++++++----- tests/unit/datastream/merge.test.ts | 82 +++++----- 4 files changed, 237 insertions(+), 123 deletions(-) diff --git a/src/datastream/datastream-client.ts b/src/datastream/datastream-client.ts index 5d161793..0a5dfb35 100644 --- a/src/datastream/datastream-client.ts +++ b/src/datastream/datastream-client.ts @@ -5,6 +5,7 @@ import { RulesEngineClient } from '../rules-engine'; import { Logger } from '../logger'; import { LazyEmitter } from './emitter'; import { partialCompany, partialUser, deepCopyCompany as deepCopyCompanyFn } from './merge'; +import * as serializers from '../serialization'; // Import cache providers from the cache module import type { CacheProvider } from '../cache/types'; @@ -705,7 +706,18 @@ export class DataStreamClient extends LazyEmitter { return; } } else { - company = message.data as Schematic.RulesengineCompany; + try { + // passthrough (not the Fern default of "fail") so a payload carrying a + // field this SDK's schema doesn't know about yet — e.g. a server that + // ships ahead of the pinned SDK — is canonicalized to camelCase for its + // known fields and kept, rather than dropping the whole entity. + company = serializers.RulesengineCompany.parseOrThrow(message.data, { + unrecognizedObjectKeys: "passthrough", + }); + } catch (error) { + this.logger.warn(`Failed to deserialize company payload: ${error}`); + return; + } } if (!company) { @@ -768,7 +780,14 @@ export class DataStreamClient extends LazyEmitter { return; } } else { - user = message.data as Schematic.RulesengineUser; + try { + user = serializers.RulesengineUser.parseOrThrow(message.data, { + unrecognizedObjectKeys: "passthrough", + }); + } catch (error) { + this.logger.warn(`Failed to deserialize user payload: ${error}`); + return; + } } if (!user) { @@ -808,13 +827,28 @@ export class DataStreamClient extends LazyEmitter { * handleFlagsMessage processes bulk flags messages */ private async handleFlagsMessage(message: DataStreamResp): Promise { - const flags = message.data as Schematic.RulesengineFlag[]; - - if (!Array.isArray(flags)) { + const rawFlags = message.data as unknown[]; + + if (!Array.isArray(rawFlags)) { this.logger.warn('Expected flags array in bulk flags message'); return; } + const flags: Schematic.RulesengineFlag[] = []; + let parseFailureCount = 0; + let firstFailure: unknown = undefined; + for (const raw of rawFlags) { + try { + flags.push(serializers.RulesengineFlag.parseOrThrow(raw, { unrecognizedObjectKeys: "passthrough" })); + } catch (error) { + parseFailureCount++; + if (firstFailure === undefined) firstFailure = error; + } + } + if (parseFailureCount > 0) { + this.logger.warn(`Failed to deserialize ${parseFailureCount} flag(s) in bulk message: ${String(firstFailure)}`); + } + const results = await Promise.allSettled( flags .filter((flag) => flag?.key) @@ -854,8 +888,16 @@ export class DataStreamClient extends LazyEmitter { * handleFlagMessage processes single flag messages */ private async handleFlagMessage(message: DataStreamResp): Promise { - const flag = message.data as Schematic.RulesengineFlag; - + let flag: Schematic.RulesengineFlag; + try { + flag = serializers.RulesengineFlag.parseOrThrow(message.data, { + unrecognizedObjectKeys: "passthrough", + }); + } catch (error) { + this.logger.warn(`Failed to deserialize flag payload: ${error}`); + return; + } + if (!flag?.key) { return; } diff --git a/src/datastream/merge.ts b/src/datastream/merge.ts index e579b4a9..8060456c 100644 --- a/src/datastream/merge.ts +++ b/src/datastream/merge.ts @@ -1,23 +1,9 @@ import type * as Schematic from "../api/types"; -/** - * Helper to read a property that may be in camelCase or snake_case form. - * Wire data from WebSocket uses snake_case; Fern-generated types use camelCase. - */ -function getProp(obj: Record, camel: string, snake: string): unknown { - return obj[camel] ?? obj[snake]; -} - -/** - * Creates a complete deep copy of a Company object. - */ export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany { return JSON.parse(JSON.stringify(c)); } -/** - * Creates a complete deep copy of a User object. - */ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.RulesengineUser { return JSON.parse(JSON.stringify(u)); } @@ -36,9 +22,9 @@ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.Rulesengin * - usage ← metric value matching (event_name, metric_period, month_reset) * Both are skipped when the partial also sends entitlements wholesale. * - * Wire format uses snake_case keys. The existing company from cache - * may have either camelCase or snake_case keys depending on how it - * was stored. + * Partial updates arrive as raw wire payloads (snake_case keys) and are merged + * into an existing camelCase-canonicalized entity; each case writes the + * corresponding camelCase field so the cached entity stays in a single shape. */ export function partialCompany( existing: Schematic.RulesengineCompany, @@ -54,43 +40,55 @@ export function partialCompany( for (const key of Object.keys(partial)) { switch (key) { case "id": + merged.id = partial[key]; + break; case "account_id": + merged.accountId = partial[key]; + break; case "environment_id": - merged[key] = partial[key]; + merged.environmentId = partial[key]; break; case "base_plan_id": - merged[key] = partial[key] ?? null; + merged.basePlanId = partial[key] ?? null; break; case "billing_product_ids": + merged.billingProductIds = partial[key]; + break; case "plan_ids": + merged.planIds = partial[key]; + break; case "plan_version_ids": + merged.planVersionIds = partial[key]; + break; case "entitlements": + merged.entitlements = partial[key]; + break; case "rules": + merged.rules = partial[key]; + break; case "traits": + merged.traits = partial[key]; + break; case "subscription": - merged[key] = partial[key]; + merged.subscription = partial[key]; break; case "keys": { - const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record; + const existingKeys = (merged.keys ?? {}) as Record; const incomingKeys = partial[key] as Record; - merged[key] = { ...existingKeys, ...incomingKeys }; + merged.keys = { ...existingKeys, ...incomingKeys }; break; } case "credit_balances": { - const existingCB = (getProp(merged, "creditBalances", "credit_balances") ?? {}) as Record< - string, - number - >; + const existingCB = (merged.creditBalances ?? {}) as Record; const incomingCB = (partial[key] ?? {}) as Record; - merged[key] = { ...existingCB, ...incomingCB }; + merged.creditBalances = { ...existingCB, ...incomingCB }; updatedBalances = incomingCB; break; } case "metrics": { - const existingMetrics = ((getProp(merged, "metrics", "metrics") as unknown[]) ?? - []) as Schematic.RulesengineCompanyMetric[]; + const existingMetrics = (merged.metrics ?? []) as Schematic.RulesengineCompanyMetric[]; const incomingMetrics = (partial[key] ?? []) as Schematic.RulesengineCompanyMetric[]; - merged[key] = upsertMetrics(existingMetrics, incomingMetrics); + merged.metrics = upsertMetrics(existingMetrics, incomingMetrics); metricsUpdated = true; break; } @@ -118,7 +116,7 @@ function syncEntitlementDerivedFields( updatedBalances: Record | undefined, metricsUpdated: boolean, ): void { - const entitlements = (getProp(merged, "entitlements", "entitlements") ?? []) as Record[]; + const entitlements = (merged.entitlements ?? []) as Record[]; if (entitlements.length === 0) { return; } @@ -127,7 +125,7 @@ function syncEntitlementDerivedFields( // upsert so entitlements can find their matching usage. const metricsLookup = new Map(); if (metricsUpdated) { - const mergedMetrics = (getProp(merged, "metrics", "metrics") ?? []) as Record[]; + const mergedMetrics = (merged.metrics ?? []) as Record[]; for (const m of mergedMetrics) { if (!m) continue; metricsLookup.set(metricKeyString(getMetricKey(m)), (m.value as number) ?? 0); @@ -170,19 +168,25 @@ export function partialUser( for (const key of Object.keys(partial)) { switch (key) { case "id": + merged.id = partial[key]; + break; case "account_id": + merged.accountId = partial[key]; + break; case "environment_id": - merged[key] = partial[key]; + merged.environmentId = partial[key]; break; case "keys": { - const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record; + const existingKeys = (merged.keys ?? {}) as Record; const incomingKeys = partial[key] as Record; - merged[key] = { ...existingKeys, ...incomingKeys }; + merged.keys = { ...existingKeys, ...incomingKeys }; break; } case "traits": + merged.traits = partial[key]; + break; case "rules": - merged[key] = partial[key]; + merged.rules = partial[key]; break; // Ignore unknown keys silently } diff --git a/tests/unit/datastream/datastream-client.test.ts b/tests/unit/datastream/datastream-client.test.ts index c0f180fb..da27a0c0 100644 --- a/tests/unit/datastream/datastream-client.test.ts +++ b/tests/unit/datastream/datastream-client.test.ts @@ -7,6 +7,23 @@ import { DatastreamWSClient } from '../../../src/datastream/websocket-client'; import { DataStreamResp, EntityType, MessageType } from '../../../src/datastream/types'; import { Logger } from '../../../src/logger'; import * as Schematic from '../../../src/api/types'; +import * as serializers from '../../../src/serialization'; + +const PARSE_OPTS = { + allowUnrecognizedEnumValues: true, + allowUnrecognizedUnionMembers: true, + unrecognizedObjectKeys: 'passthrough' as const, +}; +// The SUT runs incoming snake_case wire payloads through Fern's parseOrThrow +// to canonicalize them to camelCase before caching. Mock fixtures are written +// in wire format (snake_case), so we route them through the same serializer +// to compute the expected camelCase shape returned by getCompany/getUser/getFlag. +const asCompany = (c: unknown): Schematic.RulesengineCompany => + serializers.RulesengineCompany.parseOrThrow(c, PARSE_OPTS); +const asUser = (u: unknown): Schematic.RulesengineUser => + serializers.RulesengineUser.parseOrThrow(u, PARSE_OPTS); +const asFlag = (f: unknown): Schematic.RulesengineFlag => + serializers.RulesengineFlag.parseOrThrow(f, PARSE_OPTS); // Mock DatastreamWSClient const mockDatastreamWSClientInstance = { on: jest.fn(), @@ -52,8 +69,8 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; @@ -251,7 +268,45 @@ describe('DataStreamClient', () => { // Verify company is cached and can be retrieved using the correct keys const retrievedCompany = await client.getCompany(mockCompany.keys!); - expect(retrievedCompany).toEqual(mockCompany); + expect(retrievedCompany).toEqual(asCompany(mockCompany)); + }, 10000); + + test('keeps a company carrying an unknown field instead of dropping it (forward-compat)', async () => { + // A server running ahead of this SDK can ship a field the generated schema + // doesn't know yet. parseOrThrow defaults to dropping the whole entity on an + // unrecognized key; the SUT overrides that to "passthrough" so the company + // is still cached (known fields canonicalized, unknown field retained). + await client.start(); + + const DatastreamWSClientMock = DatastreamWSClient as jest.MockedClass; + const messageHandler = DatastreamWSClientMock.mock.calls[0][0].messageHandler; + + const companyWithUnknownField = { + ...(mockCompany as unknown as Record), + id: 'company-future', + keys: { name: 'Future Co' }, + // Field this SDK's schema has never seen: + some_future_field: { nested: 'value' }, + }; + + await messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.FULL, + data: companyWithUnknownField, + } as DataStreamResp); + + // The entity was kept (not dropped), known fields canonicalized to camelCase. + const retrieved = await client.getCompany({ name: 'Future Co' }); + expect(retrieved).toBeDefined(); + expect(retrieved?.id).toBe('company-future'); + // The unknown field survives passthrough. + expect((retrieved as unknown as Record).some_future_field).toEqual({ + nested: 'value', + }); + // And we did not warn about a failed deserialize. + expect(mockLogger.warn).not.toHaveBeenCalledWith( + expect.stringContaining('Failed to deserialize company payload'), + ); }, 10000); test('should handle user messages and update cache', async () => { @@ -273,7 +328,7 @@ describe('DataStreamClient', () => { // Verify user is cached and can be retrieved using the correct keys const retrievedUser = await client.getUser(mockUser.keys!); - expect(retrievedUser).toEqual(mockUser); + expect(retrievedUser).toEqual(asUser(mockUser)); }, 10000); test('should handle flag messages and update cache', async () => { @@ -295,7 +350,7 @@ describe('DataStreamClient', () => { // Verify flag is cached and can be retrieved const retrievedFlag = await client.getFlag(mockFlag.key); - expect(retrievedFlag).toEqual(mockFlag); + expect(retrievedFlag).toEqual(asFlag(mockFlag)); }); test('should handle partial entity message merging', async () => { @@ -311,12 +366,12 @@ describe('DataStreamClient', () => { account_id: 'account-123', environment_id: 'env-123', keys: { name: 'Partial Corp' }, - traits: [{ key: 'tier', value: 'free' }], + traits: [{ value: 'free' }], rules: [], metrics: [], plan_ids: ['plan-1'], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; @@ -328,7 +383,7 @@ describe('DataStreamClient', () => { // Verify the full company is cached const cachedFull = await client.getCompany({ name: 'Partial Corp' }); - expect(cachedFull).toEqual(fullCompany); + expect(cachedFull).toEqual(asCompany(fullCompany)); // Send a PARTIAL company message. Wire shape: data is the partial fields, // entity_id at the top level identifies the cached company to merge into. @@ -338,22 +393,24 @@ describe('DataStreamClient', () => { message_type: MessageType.PARTIAL, data: { keys: { name: 'Partial Corp' }, - traits: [{ key: 'tier', value: 'enterprise' }], + traits: [{ value: 'enterprise' }], plan_ids: ['plan-2'], }, }); // Partial messages are now properly merged: fields in the partial update // the cached entity, while fields not present in the partial are preserved. + // Cached values are camelCase (canonicalized by parseOrThrow on the FULL + // message), and partialCompany writes camelCase keys, so assertions read + // camelCase regardless of whether the field was full-loaded or merged. const cachedAfterPartial = await client.getCompany({ name: 'Partial Corp' }); expect(cachedAfterPartial.id).toBe('company-partial'); - expect((cachedAfterPartial as any).traits).toEqual([{ key: 'tier', value: 'enterprise' }]); - expect((cachedAfterPartial as any).plan_ids).toEqual(['plan-2']); - // Original fields not present in the partial message are preserved + expect((cachedAfterPartial as any).traits).toEqual([{ value: 'enterprise' }]); + expect((cachedAfterPartial as any).planIds).toEqual(['plan-2']); expect((cachedAfterPartial as any).metrics).toEqual([]); expect((cachedAfterPartial as any).rules).toEqual([]); - expect((cachedAfterPartial as any).account_id).toBe('account-123'); - expect((cachedAfterPartial as any).billing_product_ids).toEqual([]); + expect((cachedAfterPartial as any).accountId).toBe('account-123'); + expect((cachedAfterPartial as any).billingProductIds).toEqual([]); }, 10000); test('should skip partial company message when entity is not in cache', async () => { @@ -561,9 +618,9 @@ describe('DataStreamClient', () => { const cachedUser = await client.getUser(mockUser.keys!); const cachedFlag = await client.getFlag(mockFlag.key); - expect(cachedCompany).toEqual(mockCompany); - expect(cachedUser).toEqual(mockUser); - expect(cachedFlag).toEqual(mockFlag); + expect(cachedCompany).toEqual(asCompany(mockCompany)); + expect(cachedUser).toEqual(asUser(mockUser)); + expect(cachedFlag).toEqual(asFlag(mockFlag)); }); test('should handle error type messages from WebSocket', async () => { @@ -793,8 +850,8 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; @@ -827,9 +884,9 @@ describe('DataStreamClient', () => { const bySlug = await client.getCompany({ slug: 'acme-corp' }); const byExtId = await client.getCompany({ external_id: 'ext-1' }); - expect(byName).toEqual(multiKeyCompany); - expect(bySlug).toEqual(multiKeyCompany); - expect(byExtId).toEqual(multiKeyCompany); + expect(byName).toEqual(asCompany(multiKeyCompany)); + expect(bySlug).toEqual(asCompany(multiKeyCompany)); + expect(byExtId).toEqual(asCompany(multiKeyCompany)); }); test('should retrieve user by any of its keys after caching', async () => { @@ -842,8 +899,8 @@ describe('DataStreamClient', () => { const byEmail = await client.getUser({ email: 'alice@example.com' }); const byUserId = await client.getUser({ user_id: 'u-1' }); - expect(byEmail).toEqual(multiKeyUser); - expect(byUserId).toEqual(multiKeyUser); + expect(byEmail).toEqual(asUser(multiKeyUser)); + expect(byUserId).toEqual(asUser(multiKeyUser)); }); test('should remove company from cache on DELETE for all keys', async () => { @@ -859,7 +916,7 @@ describe('DataStreamClient', () => { // Verify it's cached — returns from cache without sending a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); const cached = await client.getCompany({ name: 'acme' }); - expect(cached).toEqual(multiKeyCompany); + expect(cached).toEqual(asCompany(multiKeyCompany)); expect(mockDatastreamWSClientInstance.sendMessage).not.toHaveBeenCalled(); // Send DELETE @@ -904,7 +961,7 @@ describe('DataStreamClient', () => { // Verify it's cached — returns from cache without sending a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); const cached = await client.getUser({ email: 'alice@example.com' }); - expect(cached).toEqual(multiKeyUser); + expect(cached).toEqual(asUser(multiKeyUser)); expect(mockDatastreamWSClientInstance.sendMessage).not.toHaveBeenCalled(); // Send DELETE @@ -945,7 +1002,7 @@ describe('DataStreamClient', () => { // Send updated company with same keys but different data const updatedCompany = { ...multiKeyCompany, - traits: [{ key: 'tier', value: 'enterprise' }], + traits: [{ value: 'enterprise' }], } as unknown as Schematic.RulesengineCompany; await messageHandler({ @@ -958,8 +1015,8 @@ describe('DataStreamClient', () => { const byName = await client.getCompany({ name: 'acme' }); const bySlug = await client.getCompany({ slug: 'acme-corp' }); - expect(byName).toEqual(updatedCompany); - expect(bySlug).toEqual(updatedCompany); + expect(byName).toEqual(asCompany(updatedCompany)); + expect(bySlug).toEqual(asCompany(updatedCompany)); }); test('should handle deep copy to prevent mutation of cached entities', async () => { @@ -972,7 +1029,7 @@ describe('DataStreamClient', () => { // Retrieve the company from cache const firstRetrieval = await client.getCompany({ name: 'acme' }); - expect(firstRetrieval).toEqual(multiKeyCompany); + expect(firstRetrieval).toEqual(asCompany(multiKeyCompany)); // Mutate a field on the returned object (firstRetrieval as any).traits = [{ key: 'mutated', value: 'yes' }]; @@ -999,9 +1056,9 @@ describe('DataStreamClient', () => { }); // Verify all three keys resolve from cache - expect(await client.getCompany({ name: 'acme' })).toEqual(multiKeyCompany); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(multiKeyCompany); - expect(await client.getCompany({ external_id: 'ext-1' })).toEqual(multiKeyCompany); + expect(await client.getCompany({ name: 'acme' })).toEqual(asCompany(multiKeyCompany)); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(multiKeyCompany)); + expect(await client.getCompany({ external_id: 'ext-1' })).toEqual(asCompany(multiKeyCompany)); // Update with only two keys — external_id has been removed const updatedCompany = { @@ -1016,8 +1073,8 @@ describe('DataStreamClient', () => { }); // Remaining keys should still resolve from cache - expect(await client.getCompany({ name: 'acme' })).toEqual(updatedCompany); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(updatedCompany); + expect(await client.getCompany({ name: 'acme' })).toEqual(asCompany(updatedCompany)); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(updatedCompany)); // Removed key should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1045,8 +1102,8 @@ describe('DataStreamClient', () => { }); // Verify both keys resolve from cache - expect(await client.getUser({ email: 'alice@example.com' })).toEqual(multiKeyUser); - expect(await client.getUser({ user_id: 'u-1' })).toEqual(multiKeyUser); + expect(await client.getUser({ email: 'alice@example.com' })).toEqual(asUser(multiKeyUser)); + expect(await client.getUser({ user_id: 'u-1' })).toEqual(asUser(multiKeyUser)); // Update with only email — user_id has been removed const updatedUser = { @@ -1061,7 +1118,7 @@ describe('DataStreamClient', () => { }); // Remaining key should still resolve from cache - expect(await client.getUser({ email: 'alice@example.com' })).toEqual(updatedUser); + expect(await client.getUser({ email: 'alice@example.com' })).toEqual(asUser(updatedUser)); // Removed key should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1088,7 +1145,7 @@ describe('DataStreamClient', () => { data: multiKeyCompany, }); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(multiKeyCompany); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(multiKeyCompany)); // Update: slug value changed from 'acme-corp' to 'acme-inc' const updatedCompany = { @@ -1103,7 +1160,7 @@ describe('DataStreamClient', () => { }); // New slug should resolve from cache - expect(await client.getCompany({ slug: 'acme-inc' })).toEqual(updatedCompany); + expect(await client.getCompany({ slug: 'acme-inc' })).toEqual(asCompany(updatedCompany)); // Old slug value should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1124,7 +1181,16 @@ describe('DataStreamClient', () => { const companyWithMetrics = { ...multiKeyCompany, metrics: [ - { eventSubtype: 'api-call', value: 10 }, + { + account_id: 'account-123', + company_id: 'company-multi', + created_at: '2026-01-01T00:00:00Z', + environment_id: 'env-123', + event_subtype: 'api-call', + month_reset: 'first_of_month', + period: 'all_time', + value: 10, + }, ], } as unknown as Schematic.RulesengineCompany; @@ -1156,8 +1222,8 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; diff --git a/tests/unit/datastream/merge.test.ts b/tests/unit/datastream/merge.test.ts index caec17a4..f8b3cfc8 100644 --- a/tests/unit/datastream/merge.test.ts +++ b/tests/unit/datastream/merge.test.ts @@ -6,23 +6,25 @@ import { deepCopyUser, } from '../../../src/datastream/merge'; -// Helper: base company in snake_case wire format (matches WebSocket data) +// Helper: base company in camelCase (matches the cached, parseOrThrow-normalized +// shape that partialCompany sees in production). Partial payloads arrive in +// snake_case from the wire; the merge function canonicalizes to camelCase. function baseCompany(): Schematic.RulesengineCompany { return { id: 'co-1', - account_id: 'acc-1', - environment_id: 'env-1', - base_plan_id: 'plan-1', - billing_product_ids: ['bp-1'], - credit_balances: { 'credit-1': 100.0 }, + accountId: 'acc-1', + environmentId: 'env-1', + basePlanId: 'plan-1', + billingProductIds: ['bp-1'], + creditBalances: { 'credit-1': 100.0 }, keys: { domain: 'example.com' }, - plan_ids: ['plan-1'], - plan_version_ids: ['pv-1'], + planIds: ['plan-1'], + planVersionIds: ['pv-1'], traits: [ - { value: 'Enterprise', trait_definition: { id: 'plan', comparable_type: 'string', entity_type: 'company' } }, + { value: 'Enterprise', traitDefinition: { id: 'plan', comparableType: 'string', entityType: 'company' } }, ], entitlements: [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'boolean' }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'boolean' }, ], metrics: [], rules: [], @@ -32,11 +34,11 @@ function baseCompany(): Schematic.RulesengineCompany { function baseUser(): Schematic.RulesengineUser { return { id: 'user-1', - account_id: 'acc-1', - environment_id: 'env-1', + accountId: 'acc-1', + environmentId: 'env-1', keys: { email: 'user@example.com' }, traits: [ - { value: 'Premium', trait_definition: { id: 'tier', comparable_type: 'string', entity_type: 'user' } }, + { value: 'Premium', traitDefinition: { id: 'tier', comparableType: 'string', entityType: 'user' } }, ], rules: [], } as unknown as Schematic.RulesengineUser; @@ -73,11 +75,11 @@ describe('partialCompany', () => { expect((m.traits as Record[])[0].value).toBe('Startup'); // Other fields preserved - expect(m.account_id).toBe('acc-1'); - expect(m.environment_id).toBe('env-1'); + expect(m.accountId).toBe('acc-1'); + expect(m.environmentId).toBe('env-1'); expect(m.keys).toEqual({ domain: 'example.com' }); - expect(m.billing_product_ids).toEqual(['bp-1']); - expect(m.base_plan_id).toBe('plan-1'); + expect(m.billingProductIds).toEqual(['bp-1']); + expect(m.basePlanId).toBe('plan-1'); }); test('merges keys - new key added, existing preserved', () => { @@ -98,7 +100,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); }); test('overwrites credit balance', () => { @@ -108,7 +110,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 50.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 50.0 }); }); test('upserts metrics - updates existing, appends new', () => { @@ -278,7 +280,7 @@ describe('partialCompany', () => { const m = merged as unknown as Record; expect(m.entitlements).toEqual([]); - expect(m.account_id).toBe('acc-1'); + expect(m.accountId).toBe('acc-1'); }); test('null base_plan_id sets to null', () => { @@ -288,8 +290,8 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.base_plan_id).toBeNull(); - expect(m.billing_product_ids).toEqual(['bp-1']); + expect(m.basePlanId).toBeNull(); + expect(m.billingProductIds).toEqual(['bp-1']); }); test('tolerates missing id - cache lookup uses envelope entity_id', () => { @@ -302,7 +304,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); expect(m.id).toBe('co-1'); }); @@ -386,13 +388,13 @@ describe('partialCompany', () => { const m = merged as unknown as Record; expect(m.id).toBe('co-1'); - expect(m.account_id).toBe('acc-2'); - expect(m.environment_id).toBe('env-2'); - expect(m.base_plan_id).toBe('plan-99'); - expect(m.billing_product_ids).toEqual(['bp-10', 'bp-20']); + expect(m.accountId).toBe('acc-2'); + expect(m.environmentId).toBe('env-2'); + expect(m.basePlanId).toBe('plan-99'); + expect(m.billingProductIds).toEqual(['bp-10', 'bp-20']); // Credit balances merge: credit-1 overwritten, credit-new added - expect(m.credit_balances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); const entitlements = m.entitlements as Record[]; expect(entitlements.length).toBe(2); @@ -410,8 +412,8 @@ describe('partialCompany', () => { expect(metrics[1].event_subtype).toBe('event-new'); expect(metrics[1].value).toBe(7); - expect(m.plan_ids).toEqual(['plan-99', 'plan-100']); - expect(m.plan_version_ids).toEqual(['pv-99']); + expect(m.planIds).toEqual(['plan-99', 'plan-100']); + expect(m.planVersionIds).toEqual(['pv-99']); const rules = m.rules as Record[]; expect(rules.length).toBe(2); @@ -427,8 +429,8 @@ describe('partialCompany', () => { // Original not mutated const orig = existing as unknown as Record; - expect(orig.account_id).toBe('acc-1'); - expect(orig.base_plan_id).toBe('plan-1'); + expect(orig.accountId).toBe('acc-1'); + expect(orig.basePlanId).toBe('plan-1'); expect(orig.keys).toEqual({ domain: 'example.com' }); expect((orig.metrics as Record[])[0].value).toBe(10); }); @@ -531,12 +533,12 @@ describe('deepCopyCompany', () => { const origRaw = orig as unknown as Record; origRaw.metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-1', period: 'all_time', month_reset: 'first_of_month', - value: 42, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-1', period: 'all_time', monthReset: 'first_of_month', + value: 42, createdAt: '2026-01-01T00:00:00Z', }, ]; - origRaw.subscription = { id: 'sub-1', period_start: '2026-01-01T00:00:00Z', period_end: '2027-01-01T00:00:00Z' }; + origRaw.subscription = { id: 'sub-1', periodStart: '2026-01-01T00:00:00Z', periodEnd: '2027-01-01T00:00:00Z' }; const cp = deepCopyCompany(orig); const cpRaw = cp as unknown as Record; @@ -546,8 +548,8 @@ describe('deepCopyCompany', () => { expect((origRaw.keys as Record).domain).toBe('example.com'); // Credit balances are independent - (cpRaw.credit_balances as Record)['credit-1'] = 999; - expect((origRaw.credit_balances as Record)['credit-1']).toBe(100.0); + (cpRaw.creditBalances as Record)['credit-1'] = 999; + expect((origRaw.creditBalances as Record)['credit-1']).toBe(100.0); // Metrics are independent ((cpRaw.metrics as Record[])[0]).value = 999; @@ -569,8 +571,8 @@ describe('deepCopyUser', () => { test('empty fields - user with only required fields', () => { const cp = deepCopyUser({ id: 'u1', - account_id: 'acc-1', - environment_id: 'env-1', + accountId: 'acc-1', + environmentId: 'env-1', keys: {}, traits: [], rules: [], From 9bcb8993d3cc3a9e83a0acf619a92a74ed47dbb4 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Wed, 3 Jun 2026 10:14:34 -0700 Subject: [PATCH 2/3] write creditRemaining in camelCase and canonicalize partial nested objects The WASM rules engine accepts either casing per field via serde aliases, but rejects an object carrying both casings of the same field ("duplicate field"). syncEntitlementDerivedFields wrote snake_case credit_remaining next to the canonicalized creditRemaining, so one partial credit_balances update made every subsequent flag check for that company fall back to its default value. Write the camelCase field instead, and canonicalize incoming partial metrics/entitlements via the Fern serializers so merged entities keep a single shape. Adds wasm-datastream.test.ts, which exercises the full wire -> canonicalize -> merge -> evaluate path against the real engine. --- src/datastream/merge.ts | 43 +++-- tests/unit/datastream/merge.test.ts | 83 +++++----- tests/unit/wasm-datastream.test.ts | 242 ++++++++++++++++++++++++++++ 3 files changed, 323 insertions(+), 45 deletions(-) create mode 100644 tests/unit/wasm-datastream.test.ts diff --git a/src/datastream/merge.ts b/src/datastream/merge.ts index 8060456c..7fd3e670 100644 --- a/src/datastream/merge.ts +++ b/src/datastream/merge.ts @@ -1,4 +1,20 @@ import type * as Schematic from "../api/types"; +import type { Schema } from "../core/schemas"; +import * as serializers from "../serialization"; + +const PARSE_OPTS = { unrecognizedObjectKeys: "passthrough" as const }; + +/** + * Canonicalizes a raw wire object (snake_case) to camelCase via its Fern + * serializer, falling back to the raw object if parsing fails. Used on nested + * objects arriving in partial payloads so the merged entity keeps a single + * shape — the WASM rules engine rejects objects carrying both casings of the + * same field ("duplicate field"), so per-object purity is load-bearing. + */ +function canonicalize(schema: Schema, raw: unknown): Parsed { + const result = schema.parse(raw, PARSE_OPTS); + return result.ok ? result.value : (raw as Parsed); +} export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany { return JSON.parse(JSON.stringify(c)); @@ -18,8 +34,8 @@ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.Rulesengin * Partials don't carry refreshed entitlements, so when their derived fields * change in another part of the company we sync them here to match server * behavior: - * - credit_remaining ← credit_balances[credit_id] - * - usage ← metric value matching (event_name, metric_period, month_reset) + * - creditRemaining ← credit_balances[credit_id] + * - usage ← metric value matching (eventName, metricPeriod, monthReset) * Both are skipped when the partial also sends entitlements wholesale. * * Partial updates arrive as raw wire payloads (snake_case keys) and are merged @@ -60,9 +76,13 @@ export function partialCompany( case "plan_version_ids": merged.planVersionIds = partial[key]; break; - case "entitlements": - merged.entitlements = partial[key]; + case "entitlements": { + const incoming = (partial[key] ?? []) as unknown[]; + merged.entitlements = incoming.map((e) => + canonicalize(serializers.RulesengineFeatureEntitlement, e), + ); break; + } case "rules": merged.rules = partial[key]; break; @@ -87,7 +107,9 @@ export function partialCompany( } case "metrics": { const existingMetrics = (merged.metrics ?? []) as Schematic.RulesengineCompanyMetric[]; - const incomingMetrics = (partial[key] ?? []) as Schematic.RulesengineCompanyMetric[]; + const incomingMetrics = ((partial[key] ?? []) as unknown[]).map((m) => + canonicalize(serializers.RulesengineCompanyMetric, m), + ); merged.metrics = upsertMetrics(existingMetrics, incomingMetrics); metricsUpdated = true; break; @@ -107,9 +129,9 @@ export function partialCompany( * Re-derives entitlement fields whose source data changed in a partial that * did not itself carry fresh entitlements. Mutates the entitlement objects on * the already-deep-copied `merged` company in place: - * - credit_remaining ← the incoming balance for the entitlement's credit_id - * - usage ← the merged metric value matching (event_name, metric_period, month_reset), - * defaulting metric_period to "all_time" and month_reset to "first_of_month" + * - creditRemaining ← the incoming balance for the entitlement's creditId + * - usage ← the merged metric value matching (eventName, metricPeriod, monthReset), + * defaulting metricPeriod to "all_time" and monthReset to "first_of_month" */ function syncEntitlementDerivedFields( merged: Record, @@ -135,7 +157,10 @@ function syncEntitlementDerivedFields( for (const ent of entitlements) { const creditId = (ent.creditId ?? ent.credit_id) as string | undefined; if (updatedBalances && creditId && creditId in updatedBalances) { - ent.credit_remaining = updatedBalances[creditId]; + // Write the camelCase field: the cached entity is canonicalized, and + // the WASM engine errors on an object carrying both casings. + ent.creditRemaining = updatedBalances[creditId]; + delete ent.credit_remaining; } // Credit-attached entitlements are intentionally NOT skipped: usage here diff --git a/tests/unit/datastream/merge.test.ts b/tests/unit/datastream/merge.test.ts index f8b3cfc8..017142eb 100644 --- a/tests/unit/datastream/merge.test.ts +++ b/tests/unit/datastream/merge.test.ts @@ -115,16 +115,17 @@ describe('partialCompany', () => { test('upserts metrics - updates existing, appends new', () => { const existing = baseCompany(); + // Cached metrics are camelCase (canonicalized by parseOrThrow on ingest). (existing as unknown as Record).metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-a', period: 'all_time', month_reset: 'first_of_month', - value: 10, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-a', period: 'all_time', monthReset: 'first_of_month', + value: 10, createdAt: '2026-01-01T00:00:00Z', }, { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-b', period: 'current_month', month_reset: 'first_of_month', - value: 5, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-b', period: 'current_month', monthReset: 'first_of_month', + value: 5, createdAt: '2026-01-01T00:00:00Z', }, ]; @@ -148,14 +149,15 @@ describe('partialCompany', () => { const metrics = (merged as unknown as Record).metrics as Record[]; expect(metrics.length).toBe(3); - // event-a updated in place - expect(metrics[0].event_subtype).toBe('event-a'); + // event-a updated in place; incoming wire metric canonicalized to camelCase + expect(metrics[0].eventSubtype).toBe('event-a'); + expect(metrics[0].event_subtype).toBeUndefined(); expect(metrics[0].value).toBe(42); // event-b unchanged - expect(metrics[1].event_subtype).toBe('event-b'); + expect(metrics[1].eventSubtype).toBe('event-b'); expect(metrics[1].value).toBe(5); - // event-c appended - expect(metrics[2].event_subtype).toBe('event-c'); + // event-c appended (canonicalized) + expect(metrics[2].eventSubtype).toBe('event-c'); expect(metrics[2].value).toBe(1); // Original not mutated @@ -163,12 +165,13 @@ describe('partialCompany', () => { expect(origMetrics[0].value).toBe(10); }); - test('credit_balances update re-derives entitlement credit_remaining', () => { + test('credit_balances update re-derives entitlement creditRemaining', () => { const existing = baseCompany(); + // Cached entitlements are camelCase (canonicalized by parseOrThrow on ingest). (existing as unknown as Record).entitlements = [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'credit', credit_id: 'credit-1', credit_remaining: 100.0 }, - { feature_id: 'feat-2', feature_key: 'feature-two', value_type: 'credit', credit_id: 'credit-2', credit_remaining: 0 }, - { feature_id: 'feat-3', feature_key: 'feature-three', value_type: 'boolean' }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'credit', creditId: 'credit-1', creditRemaining: 100.0 }, + { featureId: 'feat-2', featureKey: 'feature-two', valueType: 'credit', creditId: 'credit-2', creditRemaining: 0 }, + { featureId: 'feat-3', featureKey: 'feature-three', valueType: 'boolean' }, ]; // Partial only updates one of the credit balances and carries no entitlements. @@ -178,29 +181,33 @@ describe('partialCompany', () => { const ents = (merged as unknown as Record).entitlements as Record[]; // credit-1 entitlement re-derived from incoming balance - expect(ents[0].credit_remaining).toBe(42.0); + expect(ents[0].creditRemaining).toBe(42.0); + // Regression: the sync must not leave a snake_case twin next to the + // camelCase field — the WASM engine rejects objects carrying both + // casings of the same field ("duplicate field creditRemaining"). + expect(ents[0].credit_remaining).toBeUndefined(); // credit-2 was not in the partial, left untouched - expect(ents[1].credit_remaining).toBe(0); + expect(ents[1].creditRemaining).toBe(0); // non-credit entitlement untouched - expect(ents[2].credit_remaining).toBeUndefined(); + expect(ents[2].creditRemaining).toBeUndefined(); // Original not mutated const origEnts = (existing as unknown as Record).entitlements as Record[]; - expect(origEnts[0].credit_remaining).toBe(100.0); + expect(origEnts[0].creditRemaining).toBe(100.0); }); test('metrics update re-derives entitlement usage', () => { const existing = baseCompany(); (existing as unknown as Record).metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'api-calls', period: 'current_month', month_reset: 'first_of_month', - value: 10, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'api-calls', period: 'current_month', monthReset: 'first_of_month', + value: 10, createdAt: '2026-01-01T00:00:00Z', }, ]; (existing as unknown as Record).entitlements = [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'numeric', event_name: 'api-calls', metric_period: 'current_month', month_reset: 'first_of_month', usage: 10 }, - { feature_id: 'feat-2', feature_key: 'feature-two', value_type: 'numeric', event_name: 'other-event', metric_period: 'current_month', month_reset: 'first_of_month', usage: 3 }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'numeric', eventName: 'api-calls', metricPeriod: 'current_month', monthReset: 'first_of_month', usage: 10 }, + { featureId: 'feat-2', featureKey: 'feature-two', valueType: 'numeric', eventName: 'other-event', metricPeriod: 'current_month', monthReset: 'first_of_month', usage: 3 }, ]; // Partial upserts the api-calls metric and carries no entitlements. @@ -228,8 +235,8 @@ describe('partialCompany', () => { const existing = baseCompany(); (existing as unknown as Record).metrics = []; (existing as unknown as Record).entitlements = [ - // No metric_period / month_reset → should default to all_time / first_of_month - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'numeric', event_name: 'logins', usage: 0 }, + // No metricPeriod / monthReset → should default to all_time / first_of_month + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'numeric', eventName: 'logins', usage: 0 }, ]; const partial = { @@ -252,11 +259,12 @@ describe('partialCompany', () => { test('does not re-derive when partial carries entitlements wholesale', () => { const existing = baseCompany(); (existing as unknown as Record).entitlements = [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'credit', credit_id: 'credit-1', credit_remaining: 100.0 }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'credit', creditId: 'credit-1', creditRemaining: 100.0 }, ]; // Partial includes BOTH credit_balances and entitlements; the supplied // entitlements win and the derived sync is skipped entirely. + // Partial payloads arrive in wire format (snake_case). const partial = { id: 'co-1', credit_balances: { 'credit-1': 42.0 }, @@ -269,7 +277,9 @@ describe('partialCompany', () => { const ents = (merged as unknown as Record).entitlements as Record[]; // Uses the entitlement value from the partial, NOT the balance-derived 42. - expect(ents[0].credit_remaining).toBe(7.0); + // Incoming wire entitlements are canonicalized to camelCase on merge. + expect(ents[0].creditRemaining).toBe(7.0); + expect(ents[0].credit_remaining).toBeUndefined(); }); test('empty entitlements clears existing', () => { @@ -343,9 +353,9 @@ describe('partialCompany', () => { const existing = baseCompany(); (existing as unknown as Record).metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-a', period: 'all_time', month_reset: 'first_of_month', - value: 10, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-a', period: 'all_time', monthReset: 'first_of_month', + value: 10, createdAt: '2026-01-01T00:00:00Z', }, ]; (existing as unknown as Record).rules = [makeRule('rule-1')]; @@ -396,20 +406,21 @@ describe('partialCompany', () => { // Credit balances merge: credit-1 overwritten, credit-new added expect(m.creditBalances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); + // Incoming wire entitlements are canonicalized to camelCase on merge const entitlements = m.entitlements as Record[]; expect(entitlements.length).toBe(2); - expect(entitlements[0].feature_id).toBe('feat-new'); - expect(entitlements[1].feature_id).toBe('feat-2'); + expect(entitlements[0].featureId).toBe('feat-new'); + expect(entitlements[1].featureId).toBe('feat-2'); // Keys merge: domain overwritten, slug added expect(m.keys).toEqual({ domain: 'new.com', slug: 'new-slug' }); - // Metrics upsert: event-a updated, event-new appended + // Metrics upsert: event-a updated, event-new appended (canonicalized) const metrics = m.metrics as Record[]; expect(metrics.length).toBe(2); - expect(metrics[0].event_subtype).toBe('event-a'); + expect(metrics[0].eventSubtype).toBe('event-a'); expect(metrics[0].value).toBe(42); - expect(metrics[1].event_subtype).toBe('event-new'); + expect(metrics[1].eventSubtype).toBe('event-new'); expect(metrics[1].value).toBe(7); expect(m.planIds).toEqual(['plan-99', 'plan-100']); diff --git a/tests/unit/wasm-datastream.test.ts b/tests/unit/wasm-datastream.test.ts new file mode 100644 index 00000000..bf7d5503 --- /dev/null +++ b/tests/unit/wasm-datastream.test.ts @@ -0,0 +1,242 @@ +/** + * WASM <-> DataStream Integration Tests + * + * Unlike wasm-integration.test.ts (which feeds hand-written fixtures directly + * to the engine), these tests exercise the full datastream path against the + * REAL WASM rules engine: snake_case wire messages are ingested by the + * DataStreamClient (canonicalized to camelCase via the Fern serializers), + * partials are merged, and flag checks evaluate the cached entities in WASM. + * + * This is the seam where casing bugs live: the engine accepts either casing + * per field (serde aliases), but rejects an object carrying BOTH casings of + * the same field ("duplicate field"), so the cache must stay single-shape. + * + * Only the WebSocket transport is mocked; the rules engine is real. + */ +import { DataStreamClient, DataStreamClientOptions } from "../../src/datastream/datastream-client"; +import { DatastreamWSClient } from "../../src/datastream/websocket-client"; +import { DataStreamResp, EntityType, MessageType } from "../../src/datastream/types"; +import { Logger } from "../../src/logger"; + +const mockWS = { + on: jest.fn(), + start: jest.fn(), + close: jest.fn(), + isConnected: jest.fn().mockReturnValue(true), + isReady: jest.fn().mockReturnValue(true), + sendMessage: jest.fn().mockResolvedValue(undefined), +}; +jest.mock("../../src/datastream/websocket-client", () => ({ + DatastreamWSClient: jest.fn().mockImplementation(() => mockWS), +})); + +// A metric-gated flag in wire format: true once the company has >= 5 +// "api_call" events (all_time). Multi-word fields (event_subtype, +// metric_period, metric_value) make evaluation casing-sensitive. +const wireMetricFlag = { + id: "flag-metric", + account_id: "account-123", + environment_id: "env-123", + key: "metric-flag", + default_value: false, + rules: [ + { + id: "rule-metric", + account_id: "account-123", + environment_id: "env-123", + name: "Usage gate", + rule_type: "standard", + priority: 100, + value: true, + conditions: [ + { + id: "cond-metric", + account_id: "account-123", + environment_id: "env-123", + condition_type: "metric", + operator: "gte", + resource_ids: [], + event_subtype: "api_call", + metric_period: "all_time", + metric_value: 5, + trait_value: "", + }, + ], + condition_groups: [], + }, + ], +}; + +// An unconditional standard rule: evaluates true unless the engine errors, +// in which case checkFlag falls back to default_value (false). +const wireAlwaysOnFlag = { + id: "flag-on", + account_id: "account-123", + environment_id: "env-123", + key: "always-on", + default_value: false, + rules: [ + { + id: "rule-on", + account_id: "account-123", + environment_id: "env-123", + name: "Always On", + rule_type: "standard", + priority: 100, + value: true, + conditions: [], + condition_groups: [], + }, + ], +}; + +const wireMetric = (value: number) => ({ + account_id: "account-123", + environment_id: "env-123", + company_id: "company-1", + event_subtype: "api_call", + period: "all_time", + month_reset: "first_of_month", + value, + created_at: "2026-01-01T00:00:00Z", +}); + +const wireCompany = (metricValue: number) => ({ + id: "company-1", + account_id: "account-123", + environment_id: "env-123", + keys: { name: "Wire Corp" }, + base_plan_id: null, + billing_product_ids: [], + plan_ids: [], + plan_version_ids: [], + credit_balances: { "credit-1": 100.0 }, + metrics: [wireMetric(metricValue)], + traits: [], + rules: [], + entitlements: [ + { + feature_id: "feat-1", + feature_key: "feature-one", + value_type: "credit", + credit_id: "credit-1", + credit_total: 200.0, + credit_used: 100.0, + credit_remaining: 100.0, + }, + ], +}); + +describe("WASM <-> DataStream integration (real engine)", () => { + let client: DataStreamClient; + let messageHandler: (message: DataStreamResp) => Promise; + let logger: Logger; + + beforeEach(async () => { + jest.clearAllMocks(); + mockWS.sendMessage.mockResolvedValue(undefined); + logger = { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }; + const options: DataStreamClientOptions = { + apiKey: "test-api-key", + baseURL: "https://api.schematichq.com", + logger, + }; + client = new DataStreamClient(options); + await client.start(); + const WSMock = DatastreamWSClient as jest.MockedClass; + messageHandler = WSMock.mock.calls[0][0].messageHandler; + }); + + afterEach(async () => { + client.removeAllListeners(); + client.close(); + await new Promise((resolve) => setImmediate(resolve)); + }); + + const sendFullCompany = (metricValue: number) => + messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.FULL, + data: wireCompany(metricValue), + } as DataStreamResp); + + const sendFlags = () => + messageHandler({ + entity_type: EntityType.FLAGS, + message_type: MessageType.FULL, + data: [wireMetricFlag, wireAlwaysOnFlag], + } as DataStreamResp); + + const check = (flagKey: string) => client.checkFlag({ company: { name: "Wire Corp" } }, flagKey); + + test("FULL snake_case wire message -> canonicalized cache -> engine evaluates the metric gate", async () => { + await sendFullCompany(10); + await sendFlags(); + + const result = await check("metric-flag"); + expect(result.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(result.reason).not.toBe("RULES_ENGINE_UNAVAILABLE"); + expect(result.value).toBe(true); + + // The cache holds the canonical camelCase shape. + const cached = await client.getCompany({ name: "Wire Corp" }); + expect((cached as any).accountId).toBe("account-123"); + expect((cached as any).account_id).toBeUndefined(); + expect((cached as any).metrics[0].eventSubtype).toBe("api_call"); + }); + + test("PARTIAL metrics update flips the metric gate from false to true", async () => { + // Start below the gate (usage 2 < 5) + await sendFullCompany(2); + await sendFlags(); + + const before = await check("metric-flag"); + expect(before.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(before.value).toBe(false); + + // Partial metrics update arrives in wire format, bumping usage to 10. + await messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.PARTIAL, + entity_id: "company-1", + data: { metrics: [wireMetric(10)] }, + } as DataStreamResp); + + const after = await check("metric-flag"); + expect(after.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(after.value).toBe(true); + + expect(logger.warn).not.toHaveBeenCalledWith(expect.stringContaining("Failed to deserialize")); + expect(logger.error).not.toHaveBeenCalledWith(expect.stringContaining("Rules engine evaluation failed")); + }); + + test("PARTIAL credit_balances update does not break subsequent evaluation", async () => { + // Regression: syncEntitlementDerivedFields used to write snake_case + // credit_remaining next to the canonicalized creditRemaining; the engine + // rejected the duplicate and every flag check for the company fell back + // to its default value. + await sendFullCompany(10); + await sendFlags(); + + const before = await check("always-on"); + expect(before.value).toBe(true); + + await messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.PARTIAL, + entity_id: "company-1", + data: { credit_balances: { "credit-1": 42.0 } }, + } as DataStreamResp); + + // The entitlement keeps a single canonical field with the fresh value. + const cached = await client.getCompany({ name: "Wire Corp" }); + const ent = (cached as any).entitlements[0]; + expect(ent.creditRemaining).toBe(42.0); + expect(ent.credit_remaining).toBeUndefined(); + + const after = await check("always-on"); + expect(after.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(after.value).toBe(true); + expect(logger.error).not.toHaveBeenCalledWith(expect.stringContaining("Rules engine evaluation failed")); + }); +}); From 825b703b8804f6cf52c89f1855313951f1aa7642 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Wed, 3 Jun 2026 11:03:51 -0700 Subject: [PATCH 3/3] chore: fernignore wasm-datastream.test.ts --- .fernignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.fernignore b/.fernignore index a7be3e5b..2f0dc8a2 100644 --- a/.fernignore +++ b/.fernignore @@ -35,6 +35,7 @@ tests/unit/event-capture.test.ts tests/unit/events.test.ts tests/unit/logger.test.ts tests/unit/rules-engine.test.ts +tests/unit/wasm-datastream.test.ts tests/unit/wasm-integration.test.ts tests/unit/webhooks.test.ts tests/unit/wrapper.test.ts