diff --git a/.fernignore b/.fernignore index a7be3e5b..2f0dc8a2 100644 --- a/.fernignore +++ b/.fernignore @@ -35,6 +35,7 @@ tests/unit/event-capture.test.ts tests/unit/events.test.ts tests/unit/logger.test.ts tests/unit/rules-engine.test.ts +tests/unit/wasm-datastream.test.ts tests/unit/wasm-integration.test.ts tests/unit/webhooks.test.ts tests/unit/wrapper.test.ts diff --git a/src/datastream/datastream-client.ts b/src/datastream/datastream-client.ts index 5d161793..0a5dfb35 100644 --- a/src/datastream/datastream-client.ts +++ b/src/datastream/datastream-client.ts @@ -5,6 +5,7 @@ import { RulesEngineClient } from '../rules-engine'; import { Logger } from '../logger'; import { LazyEmitter } from './emitter'; import { partialCompany, partialUser, deepCopyCompany as deepCopyCompanyFn } from './merge'; +import * as serializers from '../serialization'; // Import cache providers from the cache module import type { CacheProvider } from '../cache/types'; @@ -705,7 +706,18 @@ export class DataStreamClient extends LazyEmitter { return; } } else { - company = message.data as Schematic.RulesengineCompany; + try { + // passthrough (not the Fern default of "fail") so a payload carrying a + // field this SDK's schema doesn't know about yet — e.g. a server that + // ships ahead of the pinned SDK — is canonicalized to camelCase for its + // known fields and kept, rather than dropping the whole entity. + company = serializers.RulesengineCompany.parseOrThrow(message.data, { + unrecognizedObjectKeys: "passthrough", + }); + } catch (error) { + this.logger.warn(`Failed to deserialize company payload: ${error}`); + return; + } } if (!company) { @@ -768,7 +780,14 @@ export class DataStreamClient extends LazyEmitter { return; } } else { - user = message.data as Schematic.RulesengineUser; + try { + user = serializers.RulesengineUser.parseOrThrow(message.data, { + unrecognizedObjectKeys: "passthrough", + }); + } catch (error) { + this.logger.warn(`Failed to deserialize user payload: ${error}`); + return; + } } if (!user) { @@ -808,13 +827,28 @@ export class DataStreamClient extends LazyEmitter { * handleFlagsMessage processes bulk flags messages */ private async handleFlagsMessage(message: DataStreamResp): Promise { - const flags = message.data as Schematic.RulesengineFlag[]; - - if (!Array.isArray(flags)) { + const rawFlags = message.data as unknown[]; + + if (!Array.isArray(rawFlags)) { this.logger.warn('Expected flags array in bulk flags message'); return; } + const flags: Schematic.RulesengineFlag[] = []; + let parseFailureCount = 0; + let firstFailure: unknown = undefined; + for (const raw of rawFlags) { + try { + flags.push(serializers.RulesengineFlag.parseOrThrow(raw, { unrecognizedObjectKeys: "passthrough" })); + } catch (error) { + parseFailureCount++; + if (firstFailure === undefined) firstFailure = error; + } + } + if (parseFailureCount > 0) { + this.logger.warn(`Failed to deserialize ${parseFailureCount} flag(s) in bulk message: ${String(firstFailure)}`); + } + const results = await Promise.allSettled( flags .filter((flag) => flag?.key) @@ -854,8 +888,16 @@ export class DataStreamClient extends LazyEmitter { * handleFlagMessage processes single flag messages */ private async handleFlagMessage(message: DataStreamResp): Promise { - const flag = message.data as Schematic.RulesengineFlag; - + let flag: Schematic.RulesengineFlag; + try { + flag = serializers.RulesengineFlag.parseOrThrow(message.data, { + unrecognizedObjectKeys: "passthrough", + }); + } catch (error) { + this.logger.warn(`Failed to deserialize flag payload: ${error}`); + return; + } + if (!flag?.key) { return; } diff --git a/src/datastream/merge.ts b/src/datastream/merge.ts index e579b4a9..7fd3e670 100644 --- a/src/datastream/merge.ts +++ b/src/datastream/merge.ts @@ -1,23 +1,25 @@ import type * as Schematic from "../api/types"; +import type { Schema } from "../core/schemas"; +import * as serializers from "../serialization"; + +const PARSE_OPTS = { unrecognizedObjectKeys: "passthrough" as const }; /** - * Helper to read a property that may be in camelCase or snake_case form. - * Wire data from WebSocket uses snake_case; Fern-generated types use camelCase. + * Canonicalizes a raw wire object (snake_case) to camelCase via its Fern + * serializer, falling back to the raw object if parsing fails. Used on nested + * objects arriving in partial payloads so the merged entity keeps a single + * shape — the WASM rules engine rejects objects carrying both casings of the + * same field ("duplicate field"), so per-object purity is load-bearing. */ -function getProp(obj: Record, camel: string, snake: string): unknown { - return obj[camel] ?? obj[snake]; +function canonicalize(schema: Schema, raw: unknown): Parsed { + const result = schema.parse(raw, PARSE_OPTS); + return result.ok ? result.value : (raw as Parsed); } -/** - * Creates a complete deep copy of a Company object. - */ export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany { return JSON.parse(JSON.stringify(c)); } -/** - * Creates a complete deep copy of a User object. - */ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.RulesengineUser { return JSON.parse(JSON.stringify(u)); } @@ -32,13 +34,13 @@ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.Rulesengin * Partials don't carry refreshed entitlements, so when their derived fields * change in another part of the company we sync them here to match server * behavior: - * - credit_remaining ← credit_balances[credit_id] - * - usage ← metric value matching (event_name, metric_period, month_reset) + * - creditRemaining ← credit_balances[credit_id] + * - usage ← metric value matching (eventName, metricPeriod, monthReset) * Both are skipped when the partial also sends entitlements wholesale. * - * Wire format uses snake_case keys. The existing company from cache - * may have either camelCase or snake_case keys depending on how it - * was stored. + * Partial updates arrive as raw wire payloads (snake_case keys) and are merged + * into an existing camelCase-canonicalized entity; each case writes the + * corresponding camelCase field so the cached entity stays in a single shape. */ export function partialCompany( existing: Schematic.RulesengineCompany, @@ -54,43 +56,61 @@ export function partialCompany( for (const key of Object.keys(partial)) { switch (key) { case "id": + merged.id = partial[key]; + break; case "account_id": + merged.accountId = partial[key]; + break; case "environment_id": - merged[key] = partial[key]; + merged.environmentId = partial[key]; break; case "base_plan_id": - merged[key] = partial[key] ?? null; + merged.basePlanId = partial[key] ?? null; break; case "billing_product_ids": + merged.billingProductIds = partial[key]; + break; case "plan_ids": + merged.planIds = partial[key]; + break; case "plan_version_ids": - case "entitlements": + merged.planVersionIds = partial[key]; + break; + case "entitlements": { + const incoming = (partial[key] ?? []) as unknown[]; + merged.entitlements = incoming.map((e) => + canonicalize(serializers.RulesengineFeatureEntitlement, e), + ); + break; + } case "rules": + merged.rules = partial[key]; + break; case "traits": + merged.traits = partial[key]; + break; case "subscription": - merged[key] = partial[key]; + merged.subscription = partial[key]; break; case "keys": { - const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record; + const existingKeys = (merged.keys ?? {}) as Record; const incomingKeys = partial[key] as Record; - merged[key] = { ...existingKeys, ...incomingKeys }; + merged.keys = { ...existingKeys, ...incomingKeys }; break; } case "credit_balances": { - const existingCB = (getProp(merged, "creditBalances", "credit_balances") ?? {}) as Record< - string, - number - >; + const existingCB = (merged.creditBalances ?? {}) as Record; const incomingCB = (partial[key] ?? {}) as Record; - merged[key] = { ...existingCB, ...incomingCB }; + merged.creditBalances = { ...existingCB, ...incomingCB }; updatedBalances = incomingCB; break; } case "metrics": { - const existingMetrics = ((getProp(merged, "metrics", "metrics") as unknown[]) ?? - []) as Schematic.RulesengineCompanyMetric[]; - const incomingMetrics = (partial[key] ?? []) as Schematic.RulesengineCompanyMetric[]; - merged[key] = upsertMetrics(existingMetrics, incomingMetrics); + const existingMetrics = (merged.metrics ?? []) as Schematic.RulesengineCompanyMetric[]; + const incomingMetrics = ((partial[key] ?? []) as unknown[]).map((m) => + canonicalize(serializers.RulesengineCompanyMetric, m), + ); + merged.metrics = upsertMetrics(existingMetrics, incomingMetrics); metricsUpdated = true; break; } @@ -109,16 +129,16 @@ export function partialCompany( * Re-derives entitlement fields whose source data changed in a partial that * did not itself carry fresh entitlements. Mutates the entitlement objects on * the already-deep-copied `merged` company in place: - * - credit_remaining ← the incoming balance for the entitlement's credit_id - * - usage ← the merged metric value matching (event_name, metric_period, month_reset), - * defaulting metric_period to "all_time" and month_reset to "first_of_month" + * - creditRemaining ← the incoming balance for the entitlement's creditId + * - usage ← the merged metric value matching (eventName, metricPeriod, monthReset), + * defaulting metricPeriod to "all_time" and monthReset to "first_of_month" */ function syncEntitlementDerivedFields( merged: Record, updatedBalances: Record | undefined, metricsUpdated: boolean, ): void { - const entitlements = (getProp(merged, "entitlements", "entitlements") ?? []) as Record[]; + const entitlements = (merged.entitlements ?? []) as Record[]; if (entitlements.length === 0) { return; } @@ -127,7 +147,7 @@ function syncEntitlementDerivedFields( // upsert so entitlements can find their matching usage. const metricsLookup = new Map(); if (metricsUpdated) { - const mergedMetrics = (getProp(merged, "metrics", "metrics") ?? []) as Record[]; + const mergedMetrics = (merged.metrics ?? []) as Record[]; for (const m of mergedMetrics) { if (!m) continue; metricsLookup.set(metricKeyString(getMetricKey(m)), (m.value as number) ?? 0); @@ -137,7 +157,10 @@ function syncEntitlementDerivedFields( for (const ent of entitlements) { const creditId = (ent.creditId ?? ent.credit_id) as string | undefined; if (updatedBalances && creditId && creditId in updatedBalances) { - ent.credit_remaining = updatedBalances[creditId]; + // Write the camelCase field: the cached entity is canonicalized, and + // the WASM engine errors on an object carrying both casings. + ent.creditRemaining = updatedBalances[creditId]; + delete ent.credit_remaining; } // Credit-attached entitlements are intentionally NOT skipped: usage here @@ -170,19 +193,25 @@ export function partialUser( for (const key of Object.keys(partial)) { switch (key) { case "id": + merged.id = partial[key]; + break; case "account_id": + merged.accountId = partial[key]; + break; case "environment_id": - merged[key] = partial[key]; + merged.environmentId = partial[key]; break; case "keys": { - const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record; + const existingKeys = (merged.keys ?? {}) as Record; const incomingKeys = partial[key] as Record; - merged[key] = { ...existingKeys, ...incomingKeys }; + merged.keys = { ...existingKeys, ...incomingKeys }; break; } case "traits": + merged.traits = partial[key]; + break; case "rules": - merged[key] = partial[key]; + merged.rules = partial[key]; break; // Ignore unknown keys silently } diff --git a/tests/unit/datastream/datastream-client.test.ts b/tests/unit/datastream/datastream-client.test.ts index c0f180fb..da27a0c0 100644 --- a/tests/unit/datastream/datastream-client.test.ts +++ b/tests/unit/datastream/datastream-client.test.ts @@ -7,6 +7,23 @@ import { DatastreamWSClient } from '../../../src/datastream/websocket-client'; import { DataStreamResp, EntityType, MessageType } from '../../../src/datastream/types'; import { Logger } from '../../../src/logger'; import * as Schematic from '../../../src/api/types'; +import * as serializers from '../../../src/serialization'; + +const PARSE_OPTS = { + allowUnrecognizedEnumValues: true, + allowUnrecognizedUnionMembers: true, + unrecognizedObjectKeys: 'passthrough' as const, +}; +// The SUT runs incoming snake_case wire payloads through Fern's parseOrThrow +// to canonicalize them to camelCase before caching. Mock fixtures are written +// in wire format (snake_case), so we route them through the same serializer +// to compute the expected camelCase shape returned by getCompany/getUser/getFlag. +const asCompany = (c: unknown): Schematic.RulesengineCompany => + serializers.RulesengineCompany.parseOrThrow(c, PARSE_OPTS); +const asUser = (u: unknown): Schematic.RulesengineUser => + serializers.RulesengineUser.parseOrThrow(u, PARSE_OPTS); +const asFlag = (f: unknown): Schematic.RulesengineFlag => + serializers.RulesengineFlag.parseOrThrow(f, PARSE_OPTS); // Mock DatastreamWSClient const mockDatastreamWSClientInstance = { on: jest.fn(), @@ -52,8 +69,8 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; @@ -251,7 +268,45 @@ describe('DataStreamClient', () => { // Verify company is cached and can be retrieved using the correct keys const retrievedCompany = await client.getCompany(mockCompany.keys!); - expect(retrievedCompany).toEqual(mockCompany); + expect(retrievedCompany).toEqual(asCompany(mockCompany)); + }, 10000); + + test('keeps a company carrying an unknown field instead of dropping it (forward-compat)', async () => { + // A server running ahead of this SDK can ship a field the generated schema + // doesn't know yet. parseOrThrow defaults to dropping the whole entity on an + // unrecognized key; the SUT overrides that to "passthrough" so the company + // is still cached (known fields canonicalized, unknown field retained). + await client.start(); + + const DatastreamWSClientMock = DatastreamWSClient as jest.MockedClass; + const messageHandler = DatastreamWSClientMock.mock.calls[0][0].messageHandler; + + const companyWithUnknownField = { + ...(mockCompany as unknown as Record), + id: 'company-future', + keys: { name: 'Future Co' }, + // Field this SDK's schema has never seen: + some_future_field: { nested: 'value' }, + }; + + await messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.FULL, + data: companyWithUnknownField, + } as DataStreamResp); + + // The entity was kept (not dropped), known fields canonicalized to camelCase. + const retrieved = await client.getCompany({ name: 'Future Co' }); + expect(retrieved).toBeDefined(); + expect(retrieved?.id).toBe('company-future'); + // The unknown field survives passthrough. + expect((retrieved as unknown as Record).some_future_field).toEqual({ + nested: 'value', + }); + // And we did not warn about a failed deserialize. + expect(mockLogger.warn).not.toHaveBeenCalledWith( + expect.stringContaining('Failed to deserialize company payload'), + ); }, 10000); test('should handle user messages and update cache', async () => { @@ -273,7 +328,7 @@ describe('DataStreamClient', () => { // Verify user is cached and can be retrieved using the correct keys const retrievedUser = await client.getUser(mockUser.keys!); - expect(retrievedUser).toEqual(mockUser); + expect(retrievedUser).toEqual(asUser(mockUser)); }, 10000); test('should handle flag messages and update cache', async () => { @@ -295,7 +350,7 @@ describe('DataStreamClient', () => { // Verify flag is cached and can be retrieved const retrievedFlag = await client.getFlag(mockFlag.key); - expect(retrievedFlag).toEqual(mockFlag); + expect(retrievedFlag).toEqual(asFlag(mockFlag)); }); test('should handle partial entity message merging', async () => { @@ -311,12 +366,12 @@ describe('DataStreamClient', () => { account_id: 'account-123', environment_id: 'env-123', keys: { name: 'Partial Corp' }, - traits: [{ key: 'tier', value: 'free' }], + traits: [{ value: 'free' }], rules: [], metrics: [], plan_ids: ['plan-1'], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; @@ -328,7 +383,7 @@ describe('DataStreamClient', () => { // Verify the full company is cached const cachedFull = await client.getCompany({ name: 'Partial Corp' }); - expect(cachedFull).toEqual(fullCompany); + expect(cachedFull).toEqual(asCompany(fullCompany)); // Send a PARTIAL company message. Wire shape: data is the partial fields, // entity_id at the top level identifies the cached company to merge into. @@ -338,22 +393,24 @@ describe('DataStreamClient', () => { message_type: MessageType.PARTIAL, data: { keys: { name: 'Partial Corp' }, - traits: [{ key: 'tier', value: 'enterprise' }], + traits: [{ value: 'enterprise' }], plan_ids: ['plan-2'], }, }); // Partial messages are now properly merged: fields in the partial update // the cached entity, while fields not present in the partial are preserved. + // Cached values are camelCase (canonicalized by parseOrThrow on the FULL + // message), and partialCompany writes camelCase keys, so assertions read + // camelCase regardless of whether the field was full-loaded or merged. const cachedAfterPartial = await client.getCompany({ name: 'Partial Corp' }); expect(cachedAfterPartial.id).toBe('company-partial'); - expect((cachedAfterPartial as any).traits).toEqual([{ key: 'tier', value: 'enterprise' }]); - expect((cachedAfterPartial as any).plan_ids).toEqual(['plan-2']); - // Original fields not present in the partial message are preserved + expect((cachedAfterPartial as any).traits).toEqual([{ value: 'enterprise' }]); + expect((cachedAfterPartial as any).planIds).toEqual(['plan-2']); expect((cachedAfterPartial as any).metrics).toEqual([]); expect((cachedAfterPartial as any).rules).toEqual([]); - expect((cachedAfterPartial as any).account_id).toBe('account-123'); - expect((cachedAfterPartial as any).billing_product_ids).toEqual([]); + expect((cachedAfterPartial as any).accountId).toBe('account-123'); + expect((cachedAfterPartial as any).billingProductIds).toEqual([]); }, 10000); test('should skip partial company message when entity is not in cache', async () => { @@ -561,9 +618,9 @@ describe('DataStreamClient', () => { const cachedUser = await client.getUser(mockUser.keys!); const cachedFlag = await client.getFlag(mockFlag.key); - expect(cachedCompany).toEqual(mockCompany); - expect(cachedUser).toEqual(mockUser); - expect(cachedFlag).toEqual(mockFlag); + expect(cachedCompany).toEqual(asCompany(mockCompany)); + expect(cachedUser).toEqual(asUser(mockUser)); + expect(cachedFlag).toEqual(asFlag(mockFlag)); }); test('should handle error type messages from WebSocket', async () => { @@ -793,8 +850,8 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; @@ -827,9 +884,9 @@ describe('DataStreamClient', () => { const bySlug = await client.getCompany({ slug: 'acme-corp' }); const byExtId = await client.getCompany({ external_id: 'ext-1' }); - expect(byName).toEqual(multiKeyCompany); - expect(bySlug).toEqual(multiKeyCompany); - expect(byExtId).toEqual(multiKeyCompany); + expect(byName).toEqual(asCompany(multiKeyCompany)); + expect(bySlug).toEqual(asCompany(multiKeyCompany)); + expect(byExtId).toEqual(asCompany(multiKeyCompany)); }); test('should retrieve user by any of its keys after caching', async () => { @@ -842,8 +899,8 @@ describe('DataStreamClient', () => { const byEmail = await client.getUser({ email: 'alice@example.com' }); const byUserId = await client.getUser({ user_id: 'u-1' }); - expect(byEmail).toEqual(multiKeyUser); - expect(byUserId).toEqual(multiKeyUser); + expect(byEmail).toEqual(asUser(multiKeyUser)); + expect(byUserId).toEqual(asUser(multiKeyUser)); }); test('should remove company from cache on DELETE for all keys', async () => { @@ -859,7 +916,7 @@ describe('DataStreamClient', () => { // Verify it's cached — returns from cache without sending a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); const cached = await client.getCompany({ name: 'acme' }); - expect(cached).toEqual(multiKeyCompany); + expect(cached).toEqual(asCompany(multiKeyCompany)); expect(mockDatastreamWSClientInstance.sendMessage).not.toHaveBeenCalled(); // Send DELETE @@ -904,7 +961,7 @@ describe('DataStreamClient', () => { // Verify it's cached — returns from cache without sending a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); const cached = await client.getUser({ email: 'alice@example.com' }); - expect(cached).toEqual(multiKeyUser); + expect(cached).toEqual(asUser(multiKeyUser)); expect(mockDatastreamWSClientInstance.sendMessage).not.toHaveBeenCalled(); // Send DELETE @@ -945,7 +1002,7 @@ describe('DataStreamClient', () => { // Send updated company with same keys but different data const updatedCompany = { ...multiKeyCompany, - traits: [{ key: 'tier', value: 'enterprise' }], + traits: [{ value: 'enterprise' }], } as unknown as Schematic.RulesengineCompany; await messageHandler({ @@ -958,8 +1015,8 @@ describe('DataStreamClient', () => { const byName = await client.getCompany({ name: 'acme' }); const bySlug = await client.getCompany({ slug: 'acme-corp' }); - expect(byName).toEqual(updatedCompany); - expect(bySlug).toEqual(updatedCompany); + expect(byName).toEqual(asCompany(updatedCompany)); + expect(bySlug).toEqual(asCompany(updatedCompany)); }); test('should handle deep copy to prevent mutation of cached entities', async () => { @@ -972,7 +1029,7 @@ describe('DataStreamClient', () => { // Retrieve the company from cache const firstRetrieval = await client.getCompany({ name: 'acme' }); - expect(firstRetrieval).toEqual(multiKeyCompany); + expect(firstRetrieval).toEqual(asCompany(multiKeyCompany)); // Mutate a field on the returned object (firstRetrieval as any).traits = [{ key: 'mutated', value: 'yes' }]; @@ -999,9 +1056,9 @@ describe('DataStreamClient', () => { }); // Verify all three keys resolve from cache - expect(await client.getCompany({ name: 'acme' })).toEqual(multiKeyCompany); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(multiKeyCompany); - expect(await client.getCompany({ external_id: 'ext-1' })).toEqual(multiKeyCompany); + expect(await client.getCompany({ name: 'acme' })).toEqual(asCompany(multiKeyCompany)); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(multiKeyCompany)); + expect(await client.getCompany({ external_id: 'ext-1' })).toEqual(asCompany(multiKeyCompany)); // Update with only two keys — external_id has been removed const updatedCompany = { @@ -1016,8 +1073,8 @@ describe('DataStreamClient', () => { }); // Remaining keys should still resolve from cache - expect(await client.getCompany({ name: 'acme' })).toEqual(updatedCompany); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(updatedCompany); + expect(await client.getCompany({ name: 'acme' })).toEqual(asCompany(updatedCompany)); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(updatedCompany)); // Removed key should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1045,8 +1102,8 @@ describe('DataStreamClient', () => { }); // Verify both keys resolve from cache - expect(await client.getUser({ email: 'alice@example.com' })).toEqual(multiKeyUser); - expect(await client.getUser({ user_id: 'u-1' })).toEqual(multiKeyUser); + expect(await client.getUser({ email: 'alice@example.com' })).toEqual(asUser(multiKeyUser)); + expect(await client.getUser({ user_id: 'u-1' })).toEqual(asUser(multiKeyUser)); // Update with only email — user_id has been removed const updatedUser = { @@ -1061,7 +1118,7 @@ describe('DataStreamClient', () => { }); // Remaining key should still resolve from cache - expect(await client.getUser({ email: 'alice@example.com' })).toEqual(updatedUser); + expect(await client.getUser({ email: 'alice@example.com' })).toEqual(asUser(updatedUser)); // Removed key should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1088,7 +1145,7 @@ describe('DataStreamClient', () => { data: multiKeyCompany, }); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(multiKeyCompany); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(multiKeyCompany)); // Update: slug value changed from 'acme-corp' to 'acme-inc' const updatedCompany = { @@ -1103,7 +1160,7 @@ describe('DataStreamClient', () => { }); // New slug should resolve from cache - expect(await client.getCompany({ slug: 'acme-inc' })).toEqual(updatedCompany); + expect(await client.getCompany({ slug: 'acme-inc' })).toEqual(asCompany(updatedCompany)); // Old slug value should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1124,7 +1181,16 @@ describe('DataStreamClient', () => { const companyWithMetrics = { ...multiKeyCompany, metrics: [ - { eventSubtype: 'api-call', value: 10 }, + { + account_id: 'account-123', + company_id: 'company-multi', + created_at: '2026-01-01T00:00:00Z', + environment_id: 'env-123', + event_subtype: 'api-call', + month_reset: 'first_of_month', + period: 'all_time', + value: 10, + }, ], } as unknown as Schematic.RulesengineCompany; @@ -1156,8 +1222,8 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], - crm_product_ids: [], credit_balances: {}, } as unknown as Schematic.RulesengineCompany; diff --git a/tests/unit/datastream/merge.test.ts b/tests/unit/datastream/merge.test.ts index caec17a4..017142eb 100644 --- a/tests/unit/datastream/merge.test.ts +++ b/tests/unit/datastream/merge.test.ts @@ -6,23 +6,25 @@ import { deepCopyUser, } from '../../../src/datastream/merge'; -// Helper: base company in snake_case wire format (matches WebSocket data) +// Helper: base company in camelCase (matches the cached, parseOrThrow-normalized +// shape that partialCompany sees in production). Partial payloads arrive in +// snake_case from the wire; the merge function canonicalizes to camelCase. function baseCompany(): Schematic.RulesengineCompany { return { id: 'co-1', - account_id: 'acc-1', - environment_id: 'env-1', - base_plan_id: 'plan-1', - billing_product_ids: ['bp-1'], - credit_balances: { 'credit-1': 100.0 }, + accountId: 'acc-1', + environmentId: 'env-1', + basePlanId: 'plan-1', + billingProductIds: ['bp-1'], + creditBalances: { 'credit-1': 100.0 }, keys: { domain: 'example.com' }, - plan_ids: ['plan-1'], - plan_version_ids: ['pv-1'], + planIds: ['plan-1'], + planVersionIds: ['pv-1'], traits: [ - { value: 'Enterprise', trait_definition: { id: 'plan', comparable_type: 'string', entity_type: 'company' } }, + { value: 'Enterprise', traitDefinition: { id: 'plan', comparableType: 'string', entityType: 'company' } }, ], entitlements: [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'boolean' }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'boolean' }, ], metrics: [], rules: [], @@ -32,11 +34,11 @@ function baseCompany(): Schematic.RulesengineCompany { function baseUser(): Schematic.RulesengineUser { return { id: 'user-1', - account_id: 'acc-1', - environment_id: 'env-1', + accountId: 'acc-1', + environmentId: 'env-1', keys: { email: 'user@example.com' }, traits: [ - { value: 'Premium', trait_definition: { id: 'tier', comparable_type: 'string', entity_type: 'user' } }, + { value: 'Premium', traitDefinition: { id: 'tier', comparableType: 'string', entityType: 'user' } }, ], rules: [], } as unknown as Schematic.RulesengineUser; @@ -73,11 +75,11 @@ describe('partialCompany', () => { expect((m.traits as Record[])[0].value).toBe('Startup'); // Other fields preserved - expect(m.account_id).toBe('acc-1'); - expect(m.environment_id).toBe('env-1'); + expect(m.accountId).toBe('acc-1'); + expect(m.environmentId).toBe('env-1'); expect(m.keys).toEqual({ domain: 'example.com' }); - expect(m.billing_product_ids).toEqual(['bp-1']); - expect(m.base_plan_id).toBe('plan-1'); + expect(m.billingProductIds).toEqual(['bp-1']); + expect(m.basePlanId).toBe('plan-1'); }); test('merges keys - new key added, existing preserved', () => { @@ -98,7 +100,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); }); test('overwrites credit balance', () => { @@ -108,21 +110,22 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 50.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 50.0 }); }); test('upserts metrics - updates existing, appends new', () => { const existing = baseCompany(); + // Cached metrics are camelCase (canonicalized by parseOrThrow on ingest). (existing as unknown as Record).metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-a', period: 'all_time', month_reset: 'first_of_month', - value: 10, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-a', period: 'all_time', monthReset: 'first_of_month', + value: 10, createdAt: '2026-01-01T00:00:00Z', }, { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-b', period: 'current_month', month_reset: 'first_of_month', - value: 5, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-b', period: 'current_month', monthReset: 'first_of_month', + value: 5, createdAt: '2026-01-01T00:00:00Z', }, ]; @@ -146,14 +149,15 @@ describe('partialCompany', () => { const metrics = (merged as unknown as Record).metrics as Record[]; expect(metrics.length).toBe(3); - // event-a updated in place - expect(metrics[0].event_subtype).toBe('event-a'); + // event-a updated in place; incoming wire metric canonicalized to camelCase + expect(metrics[0].eventSubtype).toBe('event-a'); + expect(metrics[0].event_subtype).toBeUndefined(); expect(metrics[0].value).toBe(42); // event-b unchanged - expect(metrics[1].event_subtype).toBe('event-b'); + expect(metrics[1].eventSubtype).toBe('event-b'); expect(metrics[1].value).toBe(5); - // event-c appended - expect(metrics[2].event_subtype).toBe('event-c'); + // event-c appended (canonicalized) + expect(metrics[2].eventSubtype).toBe('event-c'); expect(metrics[2].value).toBe(1); // Original not mutated @@ -161,12 +165,13 @@ describe('partialCompany', () => { expect(origMetrics[0].value).toBe(10); }); - test('credit_balances update re-derives entitlement credit_remaining', () => { + test('credit_balances update re-derives entitlement creditRemaining', () => { const existing = baseCompany(); + // Cached entitlements are camelCase (canonicalized by parseOrThrow on ingest). (existing as unknown as Record).entitlements = [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'credit', credit_id: 'credit-1', credit_remaining: 100.0 }, - { feature_id: 'feat-2', feature_key: 'feature-two', value_type: 'credit', credit_id: 'credit-2', credit_remaining: 0 }, - { feature_id: 'feat-3', feature_key: 'feature-three', value_type: 'boolean' }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'credit', creditId: 'credit-1', creditRemaining: 100.0 }, + { featureId: 'feat-2', featureKey: 'feature-two', valueType: 'credit', creditId: 'credit-2', creditRemaining: 0 }, + { featureId: 'feat-3', featureKey: 'feature-three', valueType: 'boolean' }, ]; // Partial only updates one of the credit balances and carries no entitlements. @@ -176,29 +181,33 @@ describe('partialCompany', () => { const ents = (merged as unknown as Record).entitlements as Record[]; // credit-1 entitlement re-derived from incoming balance - expect(ents[0].credit_remaining).toBe(42.0); + expect(ents[0].creditRemaining).toBe(42.0); + // Regression: the sync must not leave a snake_case twin next to the + // camelCase field — the WASM engine rejects objects carrying both + // casings of the same field ("duplicate field creditRemaining"). + expect(ents[0].credit_remaining).toBeUndefined(); // credit-2 was not in the partial, left untouched - expect(ents[1].credit_remaining).toBe(0); + expect(ents[1].creditRemaining).toBe(0); // non-credit entitlement untouched - expect(ents[2].credit_remaining).toBeUndefined(); + expect(ents[2].creditRemaining).toBeUndefined(); // Original not mutated const origEnts = (existing as unknown as Record).entitlements as Record[]; - expect(origEnts[0].credit_remaining).toBe(100.0); + expect(origEnts[0].creditRemaining).toBe(100.0); }); test('metrics update re-derives entitlement usage', () => { const existing = baseCompany(); (existing as unknown as Record).metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'api-calls', period: 'current_month', month_reset: 'first_of_month', - value: 10, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'api-calls', period: 'current_month', monthReset: 'first_of_month', + value: 10, createdAt: '2026-01-01T00:00:00Z', }, ]; (existing as unknown as Record).entitlements = [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'numeric', event_name: 'api-calls', metric_period: 'current_month', month_reset: 'first_of_month', usage: 10 }, - { feature_id: 'feat-2', feature_key: 'feature-two', value_type: 'numeric', event_name: 'other-event', metric_period: 'current_month', month_reset: 'first_of_month', usage: 3 }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'numeric', eventName: 'api-calls', metricPeriod: 'current_month', monthReset: 'first_of_month', usage: 10 }, + { featureId: 'feat-2', featureKey: 'feature-two', valueType: 'numeric', eventName: 'other-event', metricPeriod: 'current_month', monthReset: 'first_of_month', usage: 3 }, ]; // Partial upserts the api-calls metric and carries no entitlements. @@ -226,8 +235,8 @@ describe('partialCompany', () => { const existing = baseCompany(); (existing as unknown as Record).metrics = []; (existing as unknown as Record).entitlements = [ - // No metric_period / month_reset → should default to all_time / first_of_month - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'numeric', event_name: 'logins', usage: 0 }, + // No metricPeriod / monthReset → should default to all_time / first_of_month + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'numeric', eventName: 'logins', usage: 0 }, ]; const partial = { @@ -250,11 +259,12 @@ describe('partialCompany', () => { test('does not re-derive when partial carries entitlements wholesale', () => { const existing = baseCompany(); (existing as unknown as Record).entitlements = [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'credit', credit_id: 'credit-1', credit_remaining: 100.0 }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'credit', creditId: 'credit-1', creditRemaining: 100.0 }, ]; // Partial includes BOTH credit_balances and entitlements; the supplied // entitlements win and the derived sync is skipped entirely. + // Partial payloads arrive in wire format (snake_case). const partial = { id: 'co-1', credit_balances: { 'credit-1': 42.0 }, @@ -267,7 +277,9 @@ describe('partialCompany', () => { const ents = (merged as unknown as Record).entitlements as Record[]; // Uses the entitlement value from the partial, NOT the balance-derived 42. - expect(ents[0].credit_remaining).toBe(7.0); + // Incoming wire entitlements are canonicalized to camelCase on merge. + expect(ents[0].creditRemaining).toBe(7.0); + expect(ents[0].credit_remaining).toBeUndefined(); }); test('empty entitlements clears existing', () => { @@ -278,7 +290,7 @@ describe('partialCompany', () => { const m = merged as unknown as Record; expect(m.entitlements).toEqual([]); - expect(m.account_id).toBe('acc-1'); + expect(m.accountId).toBe('acc-1'); }); test('null base_plan_id sets to null', () => { @@ -288,8 +300,8 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.base_plan_id).toBeNull(); - expect(m.billing_product_ids).toEqual(['bp-1']); + expect(m.basePlanId).toBeNull(); + expect(m.billingProductIds).toEqual(['bp-1']); }); test('tolerates missing id - cache lookup uses envelope entity_id', () => { @@ -302,7 +314,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); expect(m.id).toBe('co-1'); }); @@ -341,9 +353,9 @@ describe('partialCompany', () => { const existing = baseCompany(); (existing as unknown as Record).metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-a', period: 'all_time', month_reset: 'first_of_month', - value: 10, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-a', period: 'all_time', monthReset: 'first_of_month', + value: 10, createdAt: '2026-01-01T00:00:00Z', }, ]; (existing as unknown as Record).rules = [makeRule('rule-1')]; @@ -386,32 +398,33 @@ describe('partialCompany', () => { const m = merged as unknown as Record; expect(m.id).toBe('co-1'); - expect(m.account_id).toBe('acc-2'); - expect(m.environment_id).toBe('env-2'); - expect(m.base_plan_id).toBe('plan-99'); - expect(m.billing_product_ids).toEqual(['bp-10', 'bp-20']); + expect(m.accountId).toBe('acc-2'); + expect(m.environmentId).toBe('env-2'); + expect(m.basePlanId).toBe('plan-99'); + expect(m.billingProductIds).toEqual(['bp-10', 'bp-20']); // Credit balances merge: credit-1 overwritten, credit-new added - expect(m.credit_balances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); + // Incoming wire entitlements are canonicalized to camelCase on merge const entitlements = m.entitlements as Record[]; expect(entitlements.length).toBe(2); - expect(entitlements[0].feature_id).toBe('feat-new'); - expect(entitlements[1].feature_id).toBe('feat-2'); + expect(entitlements[0].featureId).toBe('feat-new'); + expect(entitlements[1].featureId).toBe('feat-2'); // Keys merge: domain overwritten, slug added expect(m.keys).toEqual({ domain: 'new.com', slug: 'new-slug' }); - // Metrics upsert: event-a updated, event-new appended + // Metrics upsert: event-a updated, event-new appended (canonicalized) const metrics = m.metrics as Record[]; expect(metrics.length).toBe(2); - expect(metrics[0].event_subtype).toBe('event-a'); + expect(metrics[0].eventSubtype).toBe('event-a'); expect(metrics[0].value).toBe(42); - expect(metrics[1].event_subtype).toBe('event-new'); + expect(metrics[1].eventSubtype).toBe('event-new'); expect(metrics[1].value).toBe(7); - expect(m.plan_ids).toEqual(['plan-99', 'plan-100']); - expect(m.plan_version_ids).toEqual(['pv-99']); + expect(m.planIds).toEqual(['plan-99', 'plan-100']); + expect(m.planVersionIds).toEqual(['pv-99']); const rules = m.rules as Record[]; expect(rules.length).toBe(2); @@ -427,8 +440,8 @@ describe('partialCompany', () => { // Original not mutated const orig = existing as unknown as Record; - expect(orig.account_id).toBe('acc-1'); - expect(orig.base_plan_id).toBe('plan-1'); + expect(orig.accountId).toBe('acc-1'); + expect(orig.basePlanId).toBe('plan-1'); expect(orig.keys).toEqual({ domain: 'example.com' }); expect((orig.metrics as Record[])[0].value).toBe(10); }); @@ -531,12 +544,12 @@ describe('deepCopyCompany', () => { const origRaw = orig as unknown as Record; origRaw.metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-1', period: 'all_time', month_reset: 'first_of_month', - value: 42, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-1', period: 'all_time', monthReset: 'first_of_month', + value: 42, createdAt: '2026-01-01T00:00:00Z', }, ]; - origRaw.subscription = { id: 'sub-1', period_start: '2026-01-01T00:00:00Z', period_end: '2027-01-01T00:00:00Z' }; + origRaw.subscription = { id: 'sub-1', periodStart: '2026-01-01T00:00:00Z', periodEnd: '2027-01-01T00:00:00Z' }; const cp = deepCopyCompany(orig); const cpRaw = cp as unknown as Record; @@ -546,8 +559,8 @@ describe('deepCopyCompany', () => { expect((origRaw.keys as Record).domain).toBe('example.com'); // Credit balances are independent - (cpRaw.credit_balances as Record)['credit-1'] = 999; - expect((origRaw.credit_balances as Record)['credit-1']).toBe(100.0); + (cpRaw.creditBalances as Record)['credit-1'] = 999; + expect((origRaw.creditBalances as Record)['credit-1']).toBe(100.0); // Metrics are independent ((cpRaw.metrics as Record[])[0]).value = 999; @@ -569,8 +582,8 @@ describe('deepCopyUser', () => { test('empty fields - user with only required fields', () => { const cp = deepCopyUser({ id: 'u1', - account_id: 'acc-1', - environment_id: 'env-1', + accountId: 'acc-1', + environmentId: 'env-1', keys: {}, traits: [], rules: [], diff --git a/tests/unit/wasm-datastream.test.ts b/tests/unit/wasm-datastream.test.ts new file mode 100644 index 00000000..bf7d5503 --- /dev/null +++ b/tests/unit/wasm-datastream.test.ts @@ -0,0 +1,242 @@ +/** + * WASM <-> DataStream Integration Tests + * + * Unlike wasm-integration.test.ts (which feeds hand-written fixtures directly + * to the engine), these tests exercise the full datastream path against the + * REAL WASM rules engine: snake_case wire messages are ingested by the + * DataStreamClient (canonicalized to camelCase via the Fern serializers), + * partials are merged, and flag checks evaluate the cached entities in WASM. + * + * This is the seam where casing bugs live: the engine accepts either casing + * per field (serde aliases), but rejects an object carrying BOTH casings of + * the same field ("duplicate field"), so the cache must stay single-shape. + * + * Only the WebSocket transport is mocked; the rules engine is real. + */ +import { DataStreamClient, DataStreamClientOptions } from "../../src/datastream/datastream-client"; +import { DatastreamWSClient } from "../../src/datastream/websocket-client"; +import { DataStreamResp, EntityType, MessageType } from "../../src/datastream/types"; +import { Logger } from "../../src/logger"; + +const mockWS = { + on: jest.fn(), + start: jest.fn(), + close: jest.fn(), + isConnected: jest.fn().mockReturnValue(true), + isReady: jest.fn().mockReturnValue(true), + sendMessage: jest.fn().mockResolvedValue(undefined), +}; +jest.mock("../../src/datastream/websocket-client", () => ({ + DatastreamWSClient: jest.fn().mockImplementation(() => mockWS), +})); + +// A metric-gated flag in wire format: true once the company has >= 5 +// "api_call" events (all_time). Multi-word fields (event_subtype, +// metric_period, metric_value) make evaluation casing-sensitive. +const wireMetricFlag = { + id: "flag-metric", + account_id: "account-123", + environment_id: "env-123", + key: "metric-flag", + default_value: false, + rules: [ + { + id: "rule-metric", + account_id: "account-123", + environment_id: "env-123", + name: "Usage gate", + rule_type: "standard", + priority: 100, + value: true, + conditions: [ + { + id: "cond-metric", + account_id: "account-123", + environment_id: "env-123", + condition_type: "metric", + operator: "gte", + resource_ids: [], + event_subtype: "api_call", + metric_period: "all_time", + metric_value: 5, + trait_value: "", + }, + ], + condition_groups: [], + }, + ], +}; + +// An unconditional standard rule: evaluates true unless the engine errors, +// in which case checkFlag falls back to default_value (false). +const wireAlwaysOnFlag = { + id: "flag-on", + account_id: "account-123", + environment_id: "env-123", + key: "always-on", + default_value: false, + rules: [ + { + id: "rule-on", + account_id: "account-123", + environment_id: "env-123", + name: "Always On", + rule_type: "standard", + priority: 100, + value: true, + conditions: [], + condition_groups: [], + }, + ], +}; + +const wireMetric = (value: number) => ({ + account_id: "account-123", + environment_id: "env-123", + company_id: "company-1", + event_subtype: "api_call", + period: "all_time", + month_reset: "first_of_month", + value, + created_at: "2026-01-01T00:00:00Z", +}); + +const wireCompany = (metricValue: number) => ({ + id: "company-1", + account_id: "account-123", + environment_id: "env-123", + keys: { name: "Wire Corp" }, + base_plan_id: null, + billing_product_ids: [], + plan_ids: [], + plan_version_ids: [], + credit_balances: { "credit-1": 100.0 }, + metrics: [wireMetric(metricValue)], + traits: [], + rules: [], + entitlements: [ + { + feature_id: "feat-1", + feature_key: "feature-one", + value_type: "credit", + credit_id: "credit-1", + credit_total: 200.0, + credit_used: 100.0, + credit_remaining: 100.0, + }, + ], +}); + +describe("WASM <-> DataStream integration (real engine)", () => { + let client: DataStreamClient; + let messageHandler: (message: DataStreamResp) => Promise; + let logger: Logger; + + beforeEach(async () => { + jest.clearAllMocks(); + mockWS.sendMessage.mockResolvedValue(undefined); + logger = { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }; + const options: DataStreamClientOptions = { + apiKey: "test-api-key", + baseURL: "https://api.schematichq.com", + logger, + }; + client = new DataStreamClient(options); + await client.start(); + const WSMock = DatastreamWSClient as jest.MockedClass; + messageHandler = WSMock.mock.calls[0][0].messageHandler; + }); + + afterEach(async () => { + client.removeAllListeners(); + client.close(); + await new Promise((resolve) => setImmediate(resolve)); + }); + + const sendFullCompany = (metricValue: number) => + messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.FULL, + data: wireCompany(metricValue), + } as DataStreamResp); + + const sendFlags = () => + messageHandler({ + entity_type: EntityType.FLAGS, + message_type: MessageType.FULL, + data: [wireMetricFlag, wireAlwaysOnFlag], + } as DataStreamResp); + + const check = (flagKey: string) => client.checkFlag({ company: { name: "Wire Corp" } }, flagKey); + + test("FULL snake_case wire message -> canonicalized cache -> engine evaluates the metric gate", async () => { + await sendFullCompany(10); + await sendFlags(); + + const result = await check("metric-flag"); + expect(result.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(result.reason).not.toBe("RULES_ENGINE_UNAVAILABLE"); + expect(result.value).toBe(true); + + // The cache holds the canonical camelCase shape. + const cached = await client.getCompany({ name: "Wire Corp" }); + expect((cached as any).accountId).toBe("account-123"); + expect((cached as any).account_id).toBeUndefined(); + expect((cached as any).metrics[0].eventSubtype).toBe("api_call"); + }); + + test("PARTIAL metrics update flips the metric gate from false to true", async () => { + // Start below the gate (usage 2 < 5) + await sendFullCompany(2); + await sendFlags(); + + const before = await check("metric-flag"); + expect(before.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(before.value).toBe(false); + + // Partial metrics update arrives in wire format, bumping usage to 10. + await messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.PARTIAL, + entity_id: "company-1", + data: { metrics: [wireMetric(10)] }, + } as DataStreamResp); + + const after = await check("metric-flag"); + expect(after.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(after.value).toBe(true); + + expect(logger.warn).not.toHaveBeenCalledWith(expect.stringContaining("Failed to deserialize")); + expect(logger.error).not.toHaveBeenCalledWith(expect.stringContaining("Rules engine evaluation failed")); + }); + + test("PARTIAL credit_balances update does not break subsequent evaluation", async () => { + // Regression: syncEntitlementDerivedFields used to write snake_case + // credit_remaining next to the canonicalized creditRemaining; the engine + // rejected the duplicate and every flag check for the company fell back + // to its default value. + await sendFullCompany(10); + await sendFlags(); + + const before = await check("always-on"); + expect(before.value).toBe(true); + + await messageHandler({ + entity_type: EntityType.COMPANY, + message_type: MessageType.PARTIAL, + entity_id: "company-1", + data: { credit_balances: { "credit-1": 42.0 } }, + } as DataStreamResp); + + // The entitlement keeps a single canonical field with the fresh value. + const cached = await client.getCompany({ name: "Wire Corp" }); + const ent = (cached as any).entitlements[0]; + expect(ent.creditRemaining).toBe(42.0); + expect(ent.credit_remaining).toBeUndefined(); + + const after = await check("always-on"); + expect(after.reason).not.toBe("RULES_ENGINE_ERROR"); + expect(after.value).toBe(true); + expect(logger.error).not.toHaveBeenCalledWith(expect.stringContaining("Rules engine evaluation failed")); + }); +});