From 777cb03840e5b22656e61143ea04d37897a83a1f Mon Sep 17 00:00:00 2001 From: John Patrick Valera Date: Fri, 3 Apr 2026 08:00:09 +0000 Subject: [PATCH 1/2] feat(replay-v2): phase A contract and reporter APIs --- apps/ingest/src/index.js | 31 ++- docs/REPLAY_V2_PHASE_A.md | 106 +++++++++ packages/cypress-reporter/src/index.js | 171 ++++++++++++- packages/shared/src/index.js | 15 +- packages/shared/src/replay-v2.js | 317 +++++++++++++++++++++++++ 5 files changed, 635 insertions(+), 5 deletions(-) create mode 100644 docs/REPLAY_V2_PHASE_A.md create mode 100644 packages/shared/src/replay-v2.js diff --git a/apps/ingest/src/index.js b/apps/ingest/src/index.js index a373c88..7411930 100644 --- a/apps/ingest/src/index.js +++ b/apps/ingest/src/index.js @@ -1,6 +1,6 @@ import Fastify from 'fastify'; import pg from 'pg'; -import { INGEST_EVENT_TYPES, isValidIngestType } from '@testharbor/shared'; +import { INGEST_EVENT_TYPES, assertReplayV2ChunkPayload, isValidIngestType } from '@testharbor/shared'; const app = Fastify({ logger: true }); const port = Number(process.env.PORT || 4010); @@ -43,7 +43,8 @@ const REQUIRED_FIELDS_BY_TYPE = { [INGEST_EVENT_TYPES.SPEC_FINISHED]: ['specRunId', 'status'], [INGEST_EVENT_TYPES.TEST_RESULT]: ['testResultId', 'specRunId', 'status'], [INGEST_EVENT_TYPES.ARTIFACT_REGISTERED]: ['artifactId', 'runId', 'type', 'storageKey'], - [INGEST_EVENT_TYPES.HEARTBEAT]: ['runId'] + [INGEST_EVENT_TYPES.HEARTBEAT]: ['runId'], + [INGEST_EVENT_TYPES.REPLAY_V2_CHUNK]: ['runId', 'streamId', 'seqStart', 'seqEnd', 'events'] }; function missingKeys(obj, keys) { @@ -56,6 +57,14 @@ function validatePayloadShape(type, payload) { if (missing.length) { throw new ValidationError('payload_missing_required_fields', { type, missing }); } + + if (type === INGEST_EVENT_TYPES.REPLAY_V2_CHUNK) { + try { + assertReplayV2ChunkPayload(payload); + } catch (error) { + throw new ValidationError(error.message, error.details || { type }); + } + } } async function query(sql, params = []) { @@ -280,6 +289,17 @@ async function handleEvent(type, payload) { ); return; } + case INGEST_EVENT_TYPES.REPLAY_V2_CHUNK: { + if (!requireKeys(payload, ['runId', 'streamId', 'seqStart', 'seqEnd', 'events'])) { + throw new Error('replay.v2.chunk missing required fields'); + } + assertReplayV2ChunkPayload(payload); + const ctx = await lookupRunContextByRunId(payload.runId); + if (!ctx) { + throw new ValidationError('replay_v2_run_not_found', { runId: payload.runId }); + } + return; + } default: throw new Error(`Unhandled event type: ${type}`); } @@ -316,6 +336,13 @@ app.post('/v1/ingest/events', async (request, reply) => { return reply.code(result.duplicate ? 200 : 202).send({ ok: true, ...result }); } catch (error) { + if (error instanceof ValidationError) { + return reply.code(400).send({ + error: 'validation_error', + message: error.message, + details: error.details + }); + } const msg = String(error?.message || error); if (msg.includes('missing required fields')) { return reply.code(400).send({ diff --git a/docs/REPLAY_V2_PHASE_A.md b/docs/REPLAY_V2_PHASE_A.md new file mode 100644 index 0000000..4372cd0 --- /dev/null +++ b/docs/REPLAY_V2_PHASE_A.md @@ -0,0 +1,106 @@ +# Replay V2 Phase A + +Replay V2 Phase A defines the reporter-to-ingest wire contract for chunked replay streams. Storage and replay rendering are intentionally out of scope in this phase. + +## Chunk Contract + +Ingest event type: `replay.v2.chunk` + +Required payload fields: + +- `runId`: owning run identifier. The ingest service rejects chunks for unknown runs. +- `streamId`: stable identifier for one replay stream within the run. +- `seqStart`: first event sequence number in the chunk. +- `seqEnd`: last event sequence number in the chunk. +- `events`: non-empty ordered array of Replay V2 events. + +Optional header fields currently emitted by the reporter: + +- `schemaVersion`: current value `2.0` +- `startedAt`: replay stream wall-clock origin timestamp +- `chunkIndex`: zero-based chunk counter +- `final`: marks the terminal flushed chunk + +Validation rules: + +- `events` must be non-empty. +- Every event must carry the same `runId` and `streamId` as the chunk header. +- Event `seq` values must be strictly contiguous with no gaps. +- `seqStart` must equal the first event sequence. +- `seqEnd` must equal the last event sequence. +- `seqEnd - seqStart + 1` must equal `events.length`. +- Event `monotonicMs` must never move backwards inside a chunk. + +## Event Shape + +Each event includes: + +- `kind`: Replay V2 event kind +- `runId` +- `streamId` +- `seq` +- `monotonicMs` +- `ts` + +Optional fields: + +- `targetId` +- `selectorBundle` +- `data` + +Phase A recognizes these event kinds: + +- `session.start` +- `session.end` +- `target.declared` +- `target.rebound` +- `target.orphaned` +- `dom.snapshot` +- `dom.mutation` +- `pointer` +- `keyboard` +- `input` +- `scroll` +- `viewport` +- `navigation` +- `assertion` +- `log` +- `custom` + +## Selector Bundles And Stable Target IDs + +Selector bundles are normalized before hashing or validation: + +- known selector fields are trimmed +- array selectors are deduplicated and sorted +- empty values are removed +- `nth` is preserved only when it is a non-negative integer + +Stable target IDs are derived from normalized selector data plus optional target identity hints. This allows the reporter to redeclare the same logical target deterministically across retries or rebinding. + +## Target Lifecycle + +The Phase A reporter exposes four target lifecycle operations: + +1. `declareReplayTarget` +2. `rebindReplayTarget` +3. `markReplayTargetOrphan` +4. `queueReplayEvent` for events that reference an active target + +Lifecycle expectations: + +- A target is active immediately after declaration. +- Rebinding updates the normalized selector bundle for an existing target and returns it to active state. +- Orphaning marks the target unusable for future non-lifecycle events. +- Events that reference a target must use an active target ID, except for `target.declared` and `target.orphaned` lifecycle events themselves. + +## Reporter Behavior + +The reporter maintains: + +- a monotonic clock anchored to `startedAt` +- a strict event sequence counter beginning at `1` +- a strict chunk continuity tracker across flushes +- a target registry enforcing active versus orphaned usage + +When the pending event queue reaches `TESTHARBOR_REPLAY_CHUNK_SIZE` (default `100`), the reporter flushes a validated `replay.v2.chunk` payload to ingest. diff --git a/packages/cypress-reporter/src/index.js b/packages/cypress-reporter/src/index.js index 2762cb9..aef29fe 100644 --- a/packages/cypress-reporter/src/index.js +++ b/packages/cypress-reporter/src/index.js @@ -1,15 +1,28 @@ import crypto from 'node:crypto'; -import { INGEST_EVENT_TYPES } from '@testharbor/shared'; +import { + INGEST_EVENT_TYPES, + REPLAY_V2_EVENT_KINDS, + REPLAY_V2_SCHEMA_VERSION, + assertReplayV2ChunkPayload, + assertReplayV2EventPayload, + createReplayV2MonotonicClock, + createReplayV2SequenceTracker, + createReplayV2TargetRegistry, + getStableReplayV2TargetId, + normalizeReplayV2SelectorBundle +} from '@testharbor/shared'; function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } export class TestHarborReporterClient { - constructor({ ingestUrl, token = null, maxRetries = 3 } = {}) { + constructor({ ingestUrl, token = null, maxRetries = 3, replayChunkSize } = {}) { this.ingestUrl = ingestUrl || process.env.TESTHARBOR_INGEST_URL || 'http://localhost:4010/v1/ingest/events'; this.token = token || process.env.TESTHARBOR_INGEST_TOKEN || null; this.maxRetries = maxRetries; + this.replayChunkSize = Number(process.env.TESTHARBOR_REPLAY_CHUNK_SIZE || replayChunkSize || 100); + this.replayV2 = null; } async send(type, payload) { @@ -38,4 +51,158 @@ export class TestHarborReporterClient { throw new Error('Unreachable'); } + + startReplayV2({ runId, streamId = crypto.randomUUID(), startedAt = new Date().toISOString(), metadata = null } = {}) { + if (this.replayV2) { + throw new Error('Replay V2 session already active'); + } + + this.replayV2 = { + runId, + streamId, + startedAt, + metadata, + clock: createReplayV2MonotonicClock({ startedAt }), + eventSequence: createReplayV2SequenceTracker(), + chunkSequence: createReplayV2SequenceTracker(), + targetRegistry: createReplayV2TargetRegistry(), + pendingEvents: [], + chunkCount: 0 + }; + + return this.queueReplayEvent({ + kind: REPLAY_V2_EVENT_KINDS.SESSION_START, + data: { metadata } + }); + } + + declareReplayTarget({ targetId, selectors = {}, framePath = null, metadata = null, name = null, kind = null } = {}) { + const replay = this.#requireReplayV2Session(); + const resolvedTargetId = targetId || getStableReplayV2TargetId({ selectors, framePath, name, kind }); + const selectorBundle = normalizeReplayV2SelectorBundle({ ...selectors, framePath }); + replay.targetRegistry.declare({ targetId: resolvedTargetId, selectors: selectorBundle, framePath, metadata }); + return this.queueReplayEvent({ + kind: REPLAY_V2_EVENT_KINDS.TARGET_DECLARED, + targetId: resolvedTargetId, + selectorBundle, + data: { framePath, metadata, name, kind } + }); + } + + rebindReplayTarget({ targetId, selectors = {}, framePath = null, metadata = null } = {}) { + const replay = this.#requireReplayV2Session(); + const selectorBundle = normalizeReplayV2SelectorBundle({ ...selectors, framePath }); + replay.targetRegistry.rebind({ targetId, selectors: selectorBundle, framePath, metadata }); + return this.queueReplayEvent({ + kind: REPLAY_V2_EVENT_KINDS.TARGET_REBOUND, + targetId, + selectorBundle, + data: { framePath, metadata } + }); + } + + markReplayTargetOrphan({ targetId, reason = null } = {}) { + const replay = this.#requireReplayV2Session(); + replay.targetRegistry.orphan({ targetId, reason }); + return this.queueReplayEvent({ + kind: REPLAY_V2_EVENT_KINDS.TARGET_ORPHANED, + targetId, + data: { reason } + }); + } + + async queueReplayEvent(event = {}, { flushIfNeeded = true } = {}) { + const replay = this.#requireReplayV2Session(); + const monotonicMs = replay.clock.now(); + const seq = replay.eventSequence.assign(); + const ts = new Date(Date.parse(replay.startedAt) + monotonicMs).toISOString(); + const normalizedEvent = { + schemaVersion: REPLAY_V2_SCHEMA_VERSION, + runId: replay.runId, + streamId: replay.streamId, + seq, + monotonicMs, + ts, + ...event + }; + + if (normalizedEvent.selectorBundle != null) { + normalizedEvent.selectorBundle = normalizeReplayV2SelectorBundle(normalizedEvent.selectorBundle); + } + if ( + normalizedEvent.targetId && + normalizedEvent.kind !== REPLAY_V2_EVENT_KINDS.TARGET_DECLARED && + normalizedEvent.kind !== REPLAY_V2_EVENT_KINDS.TARGET_ORPHANED + ) { + replay.targetRegistry.assertUsable(normalizedEvent.targetId); + } + + assertReplayV2EventPayload(normalizedEvent); + replay.pendingEvents.push(normalizedEvent); + + if (flushIfNeeded && replay.pendingEvents.length >= this.#getReplayChunkSize()) { + return this.flushReplayV2Chunk(); + } + + return normalizedEvent; + } + + async flushReplayV2Chunk({ final = false } = {}) { + const replay = this.#requireReplayV2Session(); + if (replay.pendingEvents.length === 0) return null; + + const seqStart = replay.pendingEvents[0].seq; + const seqEnd = replay.pendingEvents[replay.pendingEvents.length - 1].seq; + const previousChunkSeq = replay.chunkSequence.last(); + replay.chunkSequence.assertChunkRange(seqStart, seqEnd, replay.pendingEvents.length); + + const payload = { + schemaVersion: REPLAY_V2_SCHEMA_VERSION, + runId: replay.runId, + streamId: replay.streamId, + seqStart, + seqEnd, + final, + chunkIndex: replay.chunkCount, + startedAt: replay.startedAt, + events: replay.pendingEvents + }; + + assertReplayV2ChunkPayload(payload); + let result; + try { + result = await this.send(INGEST_EVENT_TYPES.REPLAY_V2_CHUNK, payload); + } catch (error) { + replay.chunkSequence = createReplayV2SequenceTracker({ + initialSeq: previousChunkSeq + 1, + previousSeq: previousChunkSeq + }); + throw error; + } + replay.pendingEvents = []; + replay.chunkCount += 1; + return result; + } + + async endReplayV2({ status = 'completed', metadata = null } = {}) { + const replay = this.#requireReplayV2Session(); + await this.queueReplayEvent({ + kind: REPLAY_V2_EVENT_KINDS.SESSION_END, + data: { status, metadata } + }, { flushIfNeeded: false }); + const result = await this.flushReplayV2Chunk({ final: true }); + this.replayV2 = null; + return result; + } + + #requireReplayV2Session() { + if (!this.replayV2) { + throw new Error('Replay V2 session not started'); + } + return this.replayV2; + } + + #getReplayChunkSize() { + return Number.isInteger(this.replayChunkSize) && this.replayChunkSize > 0 ? this.replayChunkSize : 100; + } } diff --git a/packages/shared/src/index.js b/packages/shared/src/index.js index 62af6c7..62a4460 100644 --- a/packages/shared/src/index.js +++ b/packages/shared/src/index.js @@ -5,9 +5,22 @@ export const INGEST_EVENT_TYPES = { SPEC_FINISHED: 'spec.finished', TEST_RESULT: 'test.result', ARTIFACT_REGISTERED: 'artifact.registered', - HEARTBEAT: 'heartbeat' + HEARTBEAT: 'heartbeat', + REPLAY_V2_CHUNK: 'replay.v2.chunk' }; export function isValidIngestType(type) { return Object.values(INGEST_EVENT_TYPES).includes(type); } + +export { + REPLAY_V2_SCHEMA_VERSION, + REPLAY_V2_EVENT_KINDS, + normalizeReplayV2SelectorBundle, + getStableReplayV2TargetId, + createReplayV2MonotonicClock, + createReplayV2SequenceTracker, + createReplayV2TargetRegistry, + assertReplayV2EventPayload, + assertReplayV2ChunkPayload +} from './replay-v2.js'; diff --git a/packages/shared/src/replay-v2.js b/packages/shared/src/replay-v2.js new file mode 100644 index 0000000..f9279ba --- /dev/null +++ b/packages/shared/src/replay-v2.js @@ -0,0 +1,317 @@ +import crypto from 'node:crypto'; +import { performance } from 'node:perf_hooks'; + +export const REPLAY_V2_SCHEMA_VERSION = '2.0'; + +export const REPLAY_V2_EVENT_KINDS = { + SESSION_START: 'session.start', + SESSION_END: 'session.end', + TARGET_DECLARED: 'target.declared', + TARGET_REBOUND: 'target.rebound', + TARGET_ORPHANED: 'target.orphaned', + DOM_SNAPSHOT: 'dom.snapshot', + DOM_MUTATION: 'dom.mutation', + POINTER: 'pointer', + KEYBOARD: 'keyboard', + INPUT: 'input', + SCROLL: 'scroll', + VIEWPORT: 'viewport', + NAVIGATION: 'navigation', + ASSERTION: 'assertion', + LOG: 'log', + CUSTOM: 'custom' +}; + +const REPLAY_V2_EVENT_KIND_SET = new Set(Object.values(REPLAY_V2_EVENT_KINDS)); +const REPLAY_V2_SELECTOR_KEYS = [ + 'css', + 'xpath', + 'text', + 'testId', + 'role', + 'label', + 'placeholder', + 'altText', + 'title', + 'name', + 'value', + 'framePath' +]; + +function isPlainObject(value) { + return Boolean(value) && typeof value === 'object' && !Array.isArray(value); +} + +function assert(condition, message, details = {}) { + if (!condition) { + const error = new Error(message); + error.details = details; + throw error; + } +} + +function canonicalizeJson(value) { + if (Array.isArray(value)) { + return value.map((item) => canonicalizeJson(item)); + } + + if (isPlainObject(value)) { + return Object.keys(value) + .sort() + .reduce((acc, key) => { + acc[key] = canonicalizeJson(value[key]); + return acc; + }, {}); + } + + return value; +} + +function normalizeScalarSelectorValue(value) { + if (typeof value === 'number') return String(value); + if (typeof value !== 'string') return null; + const normalized = value.trim(); + return normalized ? normalized : null; +} + +function normalizeSelectorValue(value) { + if (Array.isArray(value)) { + const normalizedValues = [...new Set(value.map((item) => normalizeScalarSelectorValue(item)).filter(Boolean))].sort(); + return normalizedValues.length ? normalizedValues : null; + } + + return normalizeScalarSelectorValue(value); +} + +export function normalizeReplayV2SelectorBundle(bundle = {}) { + assert(isPlainObject(bundle), 'replay_v2_selector_bundle_invalid', { bundle }); + + const normalized = {}; + for (const key of REPLAY_V2_SELECTOR_KEYS) { + const value = normalizeSelectorValue(bundle[key]); + if (value !== null) normalized[key] = value; + } + + if (Number.isInteger(bundle.nth) && bundle.nth >= 0) { + normalized.nth = bundle.nth; + } + + return canonicalizeJson(normalized); +} + +export function getStableReplayV2TargetId(input = {}) { + const normalizedSelectors = normalizeReplayV2SelectorBundle(input.selectors || input.selectorBundle || {}); + const normalizedIdentity = canonicalizeJson({ + framePath: normalizeSelectorValue(input.framePath) ?? null, + kind: normalizeScalarSelectorValue(input.kind) ?? null, + name: normalizeScalarSelectorValue(input.name) ?? null, + selectors: normalizedSelectors + }); + const digest = crypto.createHash('sha256').update(JSON.stringify(normalizedIdentity)).digest('hex'); + return `rv2_tgt_${digest.slice(0, 20)}`; +} + +export function createReplayV2MonotonicClock({ startedAt = new Date().toISOString() } = {}) { + const startedAtMs = Date.parse(startedAt); + assert(Number.isFinite(startedAtMs), 'replay_v2_started_at_invalid', { startedAt }); + + const origin = performance.now(); + let lastMs = 0; + + return { + startedAt, + now() { + const elapsed = Math.max(0, Math.round(performance.now() - origin)); + lastMs = Math.max(lastMs, elapsed); + return lastMs; + } + }; +} + +export function createReplayV2SequenceTracker({ initialSeq = 1, previousSeq = 0 } = {}) { + assert(Number.isInteger(initialSeq) && initialSeq >= 1, 'replay_v2_initial_seq_invalid', { initialSeq }); + assert(Number.isInteger(previousSeq) && previousSeq >= 0, 'replay_v2_previous_seq_invalid', { previousSeq }); + + let nextSeq = initialSeq; + let lastSeq = previousSeq; + + return { + peek() { + return nextSeq; + }, + last() { + return lastSeq; + }, + assign() { + assert(nextSeq === lastSeq + 1, 'replay_v2_sequence_gap', { expected: lastSeq + 1, actual: nextSeq }); + const seq = nextSeq; + lastSeq = seq; + nextSeq += 1; + return seq; + }, + assertChunkRange(seqStart, seqEnd, eventCount) { + assert(Number.isInteger(seqStart) && Number.isInteger(seqEnd), 'replay_v2_chunk_seq_invalid', { seqStart, seqEnd }); + assert(seqStart >= 1 && seqEnd >= seqStart, 'replay_v2_chunk_seq_range_invalid', { seqStart, seqEnd }); + assert(seqEnd - seqStart + 1 === eventCount, 'replay_v2_chunk_sequence_count_mismatch', { + seqStart, + seqEnd, + eventCount + }); + if (lastSeq > 0) { + assert(seqStart === lastSeq + 1, 'replay_v2_chunk_sequence_discontinuity', { expected: lastSeq + 1, seqStart }); + } + lastSeq = seqEnd; + nextSeq = seqEnd + 1; + } + }; +} + +export function createReplayV2TargetRegistry() { + const targets = new Map(); + + function requireTarget(targetId) { + const target = targets.get(targetId); + assert(target, 'replay_v2_target_unknown', { targetId }); + return target; + } + + return { + declare({ targetId, selectors = {}, framePath = null, metadata = null } = {}) { + assert(typeof targetId === 'string' && targetId.length > 0, 'replay_v2_target_id_invalid', { targetId }); + const normalizedSelectors = normalizeReplayV2SelectorBundle(selectors); + const record = { + targetId, + selectors: normalizedSelectors, + framePath: normalizeSelectorValue(framePath), + metadata: metadata ?? null, + state: 'active' + }; + targets.set(targetId, record); + return record; + }, + rebind({ targetId, selectors = {}, framePath = null, metadata = null } = {}) { + const current = requireTarget(targetId); + const updated = { + ...current, + selectors: normalizeReplayV2SelectorBundle(selectors), + framePath: normalizeSelectorValue(framePath), + metadata: metadata ?? current.metadata ?? null, + state: 'active' + }; + targets.set(targetId, updated); + return updated; + }, + orphan({ targetId, reason = null } = {}) { + const current = requireTarget(targetId); + const updated = { + ...current, + state: 'orphaned', + orphanedReason: normalizeScalarSelectorValue(reason) + }; + targets.set(targetId, updated); + return updated; + }, + assertUsable(targetId) { + const current = requireTarget(targetId); + assert(current.state === 'active', 'replay_v2_target_orphaned', { targetId }); + return current; + }, + get(targetId) { + return targets.get(targetId) || null; + } + }; +} + +function assertString(value, message, details) { + assert(typeof value === 'string' && value.length > 0, message, details); +} + +function assertOptionalString(value, message, details) { + if (value == null) return; + assert(typeof value === 'string' && value.length > 0, message, details); +} + +export function assertReplayV2EventPayload(event) { + assert(isPlainObject(event), 'replay_v2_event_invalid', { event }); + assertString(event.kind, 'replay_v2_event_kind_invalid', { kind: event.kind }); + assert(REPLAY_V2_EVENT_KIND_SET.has(event.kind), 'replay_v2_event_kind_unsupported', { kind: event.kind }); + assertString(event.runId, 'replay_v2_event_run_id_missing', { event }); + assertString(event.streamId, 'replay_v2_event_stream_id_missing', { event }); + assert(Number.isInteger(event.seq) && event.seq >= 1, 'replay_v2_event_seq_invalid', { seq: event.seq }); + assert(Number.isInteger(event.monotonicMs) && event.monotonicMs >= 0, 'replay_v2_event_monotonic_invalid', { + monotonicMs: event.monotonicMs + }); + assertString(event.ts, 'replay_v2_event_ts_invalid', { ts: event.ts }); + assert(Number.isFinite(Date.parse(event.ts)), 'replay_v2_event_ts_unparseable', { ts: event.ts }); + assertOptionalString(event.targetId, 'replay_v2_event_target_id_invalid', { targetId: event.targetId }); + if (event.selectorBundle != null) { + event.selectorBundle = normalizeReplayV2SelectorBundle(event.selectorBundle); + } + if (event.data != null) { + assert(isPlainObject(event.data) || Array.isArray(event.data), 'replay_v2_event_data_invalid', { data: event.data }); + } + return event; +} + +export function assertReplayV2ChunkPayload(payload) { + assert(isPlainObject(payload), 'replay_v2_chunk_invalid', { payload }); + assertString(payload.runId, 'replay_v2_chunk_run_id_missing', { payload }); + assertString(payload.streamId, 'replay_v2_chunk_stream_id_missing', { payload }); + assert(Number.isInteger(payload.seqStart) && payload.seqStart >= 1, 'replay_v2_chunk_seq_start_invalid', { + seqStart: payload.seqStart + }); + assert(Number.isInteger(payload.seqEnd) && payload.seqEnd >= payload.seqStart, 'replay_v2_chunk_seq_end_invalid', { + seqEnd: payload.seqEnd, + seqStart: payload.seqStart + }); + assert(Array.isArray(payload.events) && payload.events.length > 0, 'replay_v2_chunk_events_invalid', { + events: payload.events + }); + + if (payload.schemaVersion != null) { + assert(payload.schemaVersion === REPLAY_V2_SCHEMA_VERSION, 'replay_v2_chunk_schema_version_invalid', { + schemaVersion: payload.schemaVersion + }); + } + + let expectedSeq = payload.seqStart; + let previousMonotonicMs = -1; + + for (const event of payload.events) { + assertReplayV2EventPayload(event); + assert(event.runId === payload.runId, 'replay_v2_chunk_run_id_parity_error', { + chunkRunId: payload.runId, + eventRunId: event.runId + }); + assert(event.streamId === payload.streamId, 'replay_v2_chunk_stream_id_parity_error', { + chunkStreamId: payload.streamId, + eventStreamId: event.streamId + }); + assert(event.seq === expectedSeq, 'replay_v2_chunk_sequence_discontinuity', { + expectedSeq, + actualSeq: event.seq + }); + assert(event.monotonicMs >= previousMonotonicMs, 'replay_v2_chunk_monotonic_regression', { + previousMonotonicMs, + monotonicMs: event.monotonicMs + }); + previousMonotonicMs = event.monotonicMs; + expectedSeq += 1; + } + + assert(payload.events[0].seq === payload.seqStart, 'replay_v2_chunk_seq_start_parity_error', { + seqStart: payload.seqStart, + firstEventSeq: payload.events[0].seq + }); + assert(payload.events[payload.events.length - 1].seq === payload.seqEnd, 'replay_v2_chunk_seq_end_parity_error', { + seqEnd: payload.seqEnd, + lastEventSeq: payload.events[payload.events.length - 1].seq + }); + assert(payload.seqEnd - payload.seqStart + 1 === payload.events.length, 'replay_v2_chunk_event_count_mismatch', { + seqStart: payload.seqStart, + seqEnd: payload.seqEnd, + count: payload.events.length + }); + + return payload; +} From ed9623b53e8472eb788bd27ebfa2bc3eca9b2c72 Mon Sep 17 00:00:00 2001 From: John Patrick Valera Date: Fri, 3 Apr 2026 08:19:37 +0000 Subject: [PATCH 2/2] feat(replay-v2): phase B durable chunk persistence --- apps/ingest/src/index.js | 167 ++++++++++++++++-- docs/REPLAY_V2_PHASE_B.md | 26 +++ infra/db/migrations/008_replay_v2_storage.sql | 66 +++++++ scripts/db-migrate-container.mjs | 3 +- scripts/migrate-in-container.sh | 4 +- scripts/run-migrations.sh | 1 + 6 files changed, 252 insertions(+), 15 deletions(-) create mode 100644 docs/REPLAY_V2_PHASE_B.md create mode 100644 infra/db/migrations/008_replay_v2_storage.sql diff --git a/apps/ingest/src/index.js b/apps/ingest/src/index.js index 7411930..a0d2ee3 100644 --- a/apps/ingest/src/index.js +++ b/apps/ingest/src/index.js @@ -68,9 +68,19 @@ function validatePayloadShape(type, payload) { } async function query(sql, params = []) { + return pool.query(sql, params); +} + +async function withTransaction(handler) { const client = await pool.connect(); try { - return await client.query(sql, params); + await client.query('begin'); + const result = await handler(client); + await client.query('commit'); + return result; + } catch (error) { + await client.query('rollback'); + throw error; } finally { client.release(); } @@ -134,13 +144,13 @@ async function enqueueWebhooks({ eventType, workspaceId, runId = null, payload } } } -async function resolveTestCaseId(payload) { +async function resolveTestCaseId(payload, db = pool) { if (payload.testCaseId) return payload.testCaseId; if (!requireKeys(payload, ['projectId', 'stableTestKey', 'title', 'filePath'])) { throw new Error('test.result requires testCaseId or projectId+stableTestKey+title+filePath'); } - const row = await query( + const row = await db.query( `insert into test_cases(project_id, stable_test_key, title, file_path, suite_path) values($1,$2,$3,$4,$5) on conflict (project_id, stable_test_key) @@ -152,13 +162,13 @@ async function resolveTestCaseId(payload) { return row.rows[0].id; } -async function lookupRunContextByRunId(runId) { - const res = await query('select id, workspace_id, project_id from runs where id = $1', [runId]); +async function lookupRunContextByRunId(runId, db = pool) { + const res = await db.query('select id, workspace_id, project_id from runs where id = $1', [runId]); return res.rows[0] || null; } -async function lookupRunContextBySpecRunId(specRunId) { - const res = await query( +async function lookupRunContextBySpecRunId(specRunId, db = pool) { + const res = await db.query( `select r.id as run_id, r.workspace_id, r.project_id from spec_runs s join runs r on r.id = s.run_id @@ -168,7 +178,141 @@ async function lookupRunContextBySpecRunId(specRunId) { return res.rows[0] || null; } -async function handleEvent(type, payload) { +async function persistReplayV2Chunk(payload, idempotencyKey) { + await withTransaction(async (db) => { + await db.query('select pg_advisory_xact_lock(hashtext($1), hashtext($2))', [payload.runId, payload.streamId]); + + const existingChunk = await db.query( + `select run_id, stream_id, seq_start, seq_end + from replay_v2_chunks + where idempotency_key = $1`, + [idempotencyKey] + ); + if (existingChunk.rows.length) return; + + if (!(await lookupRunContextByRunId(payload.runId, db))) { + throw new ValidationError('replay_v2_run_not_found', { runId: payload.runId }); + } + + await db.query( + `insert into replay_v2_streams ( + run_id, stream_id, schema_version, started_at, metadata_json, + first_seq, last_seq, chunk_count, event_count, final_received, created_at, updated_at + ) + values ($1, $2, $3, $4::timestamptz, $5::jsonb, null, null, 0, 0, false, now(), now()) + on conflict (run_id, stream_id) + do update set + schema_version = excluded.schema_version, + started_at = coalesce(replay_v2_streams.started_at, excluded.started_at), + metadata_json = coalesce(replay_v2_streams.metadata_json, excluded.metadata_json), + updated_at = now()`, + [ + payload.runId, + payload.streamId, + payload.schemaVersion ?? '2.0', + payload.startedAt ?? null, + JSON.stringify(payload.metadata ?? null) + ] + ); + + const stream = await db.query( + `select first_seq, last_seq, chunk_count, event_count, final_received + from replay_v2_streams + where run_id = $1 and stream_id = $2 + for update`, + [payload.runId, payload.streamId] + ); + + const streamState = stream.rows[0]; + const lastSeq = streamState?.last_seq ?? 0; + const expectedSeqStart = lastSeq + 1; + + if (payload.seqStart > expectedSeqStart) { + throw new ValidationError('replay_v2_seq_gap_persisted', { + runId: payload.runId, + streamId: payload.streamId, + expectedSeqStart, + actualSeqStart: payload.seqStart, + lastSeq + }); + } + + if (payload.seqStart < expectedSeqStart) { + if (payload.seqEnd <= lastSeq) return; + throw new ValidationError('replay_v2_seq_overlap_conflict', { + runId: payload.runId, + streamId: payload.streamId, + expectedSeqStart, + actualSeqStart: payload.seqStart, + seqEnd: payload.seqEnd, + lastSeq + }); + } + + const chunkResult = await db.query( + `insert into replay_v2_chunks ( + run_id, stream_id, idempotency_key, schema_version, + seq_start, seq_end, event_count, chunk_index, final, started_at, payload_json + ) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::timestamptz, $11::jsonb) + returning id`, + [ + payload.runId, + payload.streamId, + idempotencyKey, + payload.schemaVersion ?? '2.0', + payload.seqStart, + payload.seqEnd, + payload.events.length, + payload.chunkIndex ?? null, + payload.final === true, + payload.startedAt ?? null, + JSON.stringify(payload) + ] + ); + const chunkId = chunkResult.rows[0].id; + + const eventValues = []; + const eventPlaceholders = payload.events.map((event, index) => { + const offset = index * 10; + eventValues.push( + payload.runId, + payload.streamId, + event.seq, + event.kind, + event.ts, + event.monotonicMs, + event.targetId ?? null, + JSON.stringify(event.selectorBundle ?? null), + JSON.stringify(event.data ?? null), + chunkId + ); + return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}::timestamptz, $${offset + 6}, $${offset + 7}, $${offset + 8}::jsonb, $${offset + 9}::jsonb, $${offset + 10})`; + }); + + await db.query( + `insert into replay_v2_events ( + run_id, stream_id, seq, kind, ts, monotonic_ms, target_id, selector_bundle, data_json, chunk_id + ) + values ${eventPlaceholders.join(', ')}`, + eventValues + ); + + await db.query( + `update replay_v2_streams + set first_seq = coalesce(first_seq, $3), + last_seq = $4, + chunk_count = chunk_count + 1, + event_count = event_count + $5, + final_received = final_received or $6, + updated_at = now() + where run_id = $1 and stream_id = $2`, + [payload.runId, payload.streamId, payload.seqStart, payload.seqEnd, payload.events.length, payload.final === true] + ); + }); +} + +async function handleEvent(type, payload, { idempotencyKey } = {}) { switch (type) { case INGEST_EVENT_TYPES.RUN_STARTED: { if (!requireKeys(payload, ['runId', 'workspaceId', 'projectId'])) throw new Error('run.started missing required fields'); @@ -294,10 +438,7 @@ async function handleEvent(type, payload) { throw new Error('replay.v2.chunk missing required fields'); } assertReplayV2ChunkPayload(payload); - const ctx = await lookupRunContextByRunId(payload.runId); - if (!ctx) { - throw new ValidationError('replay_v2_run_not_found', { runId: payload.runId }); - } + await persistReplayV2Chunk(payload, idempotencyKey); return; } default: @@ -331,7 +472,7 @@ app.post('/v1/ingest/events', async (request, reply) => { try { const result = await withIdempotency(idempotencyKey, type, payload, async () => { - await handleEvent(type, payload); + await handleEvent(type, payload, { idempotencyKey }); }); return reply.code(result.duplicate ? 200 : 202).send({ ok: true, ...result }); diff --git a/docs/REPLAY_V2_PHASE_B.md b/docs/REPLAY_V2_PHASE_B.md new file mode 100644 index 0000000..ab55d33 --- /dev/null +++ b/docs/REPLAY_V2_PHASE_B.md @@ -0,0 +1,26 @@ +# Replay V2 Phase B + +Replay V2 Phase B stores validated `replay.v2.chunk` payloads durably in Postgres during ingest. + +## What Is Persisted + +- `replay_v2_streams` stores one aggregate row per `(run_id, stream_id)` with schema version, stream timestamps, counters, and terminal-state tracking. +- `replay_v2_chunks` stores each accepted chunk header plus the original validated `payload_json`, keyed by ingest `idempotencyKey`. +- `replay_v2_events` stores one ordered row per replay event sequence with normalized selector bundles, event data, and the owning chunk reference. + +## Continuity Semantics + +- Replay persistence is transactional and serialized per `(run_id, stream_id)`. +- The next accepted chunk must start at `coalesce(last_seq, 0) + 1`. +- If a chunk starts after the expected sequence, ingest rejects it with `replay_v2_seq_gap_persisted`. +- If a chunk starts before the expected sequence and extends past the persisted tail, ingest rejects it with `replay_v2_seq_overlap_conflict`. + +## Duplicate Handling + +- If a retried chunk is fully covered by the persisted stream tail (`seqEnd <= last_seq`), ingest treats it as a duplicate replay and leaves the replay tables unchanged. +- If the same ingest `idempotencyKey` already exists in `replay_v2_chunks`, persistence is skipped and the existing outer ingest idempotency envelope remains unchanged. +- Accepted chunks still record the normal ingest idempotency outcome in `ingest_events`. + +## Phase C + +Phase C still needs replay read models and API/web viewer work on top of these persisted tables. diff --git a/infra/db/migrations/008_replay_v2_storage.sql b/infra/db/migrations/008_replay_v2_storage.sql new file mode 100644 index 0000000..979e8cd --- /dev/null +++ b/infra/db/migrations/008_replay_v2_storage.sql @@ -0,0 +1,66 @@ +CREATE TABLE IF NOT EXISTS replay_v2_streams ( + run_id UUID NOT NULL REFERENCES runs(id) ON DELETE CASCADE, + stream_id TEXT NOT NULL, + schema_version TEXT NOT NULL DEFAULT '2.0', + started_at TIMESTAMPTZ, + metadata_json JSONB, + first_seq INT, + last_seq INT, + chunk_count INT NOT NULL DEFAULT 0, + event_count INT NOT NULL DEFAULT 0, + final_received BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (run_id, stream_id) +); + +CREATE TABLE IF NOT EXISTS replay_v2_chunks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + run_id UUID NOT NULL, + stream_id TEXT NOT NULL, + idempotency_key TEXT UNIQUE NOT NULL, + schema_version TEXT NOT NULL DEFAULT '2.0', + seq_start INT NOT NULL, + seq_end INT NOT NULL, + event_count INT NOT NULL, + chunk_index INT, + final BOOLEAN NOT NULL DEFAULT false, + started_at TIMESTAMPTZ, + payload_json JSONB NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT now(), + FOREIGN KEY (run_id, stream_id) REFERENCES replay_v2_streams(run_id, stream_id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS replay_v2_events ( + run_id UUID NOT NULL, + stream_id TEXT NOT NULL, + seq INT NOT NULL, + kind TEXT NOT NULL, + ts TIMESTAMPTZ NOT NULL, + monotonic_ms INT NOT NULL, + target_id TEXT, + selector_bundle JSONB, + data_json JSONB, + chunk_id UUID REFERENCES replay_v2_chunks(id) ON DELETE SET NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (run_id, stream_id, seq), + FOREIGN KEY (run_id, stream_id) REFERENCES replay_v2_streams(run_id, stream_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_replay_v2_streams_run + ON replay_v2_streams(run_id, updated_at DESC); + +CREATE INDEX IF NOT EXISTS idx_replay_v2_chunks_run_received + ON replay_v2_chunks(run_id, received_at DESC); + +CREATE INDEX IF NOT EXISTS idx_replay_v2_chunks_stream_seq + ON replay_v2_chunks(run_id, stream_id, seq_start, seq_end); + +CREATE INDEX IF NOT EXISTS idx_replay_v2_events_run_ts + ON replay_v2_events(run_id, ts, seq); + +CREATE INDEX IF NOT EXISTS idx_replay_v2_events_stream_ts + ON replay_v2_events(run_id, stream_id, ts, seq); + +CREATE INDEX IF NOT EXISTS idx_replay_v2_events_chunk + ON replay_v2_events(chunk_id); diff --git a/scripts/db-migrate-container.mjs b/scripts/db-migrate-container.mjs index e2c1582..6cf8d1a 100644 --- a/scripts/db-migrate-container.mjs +++ b/scripts/db-migrate-container.mjs @@ -9,7 +9,8 @@ const files = [ "infra/db/migrations/003_ingest_idempotency.sql", "infra/db/migrations/004_webhook_deliveries.sql", "infra/db/migrations/005_batches_11_18.sql", - "infra/db/migrations/006_batches_19_26.sql" + "infra/db/migrations/006_batches_19_26.sql", + "infra/db/migrations/008_replay_v2_storage.sql" ]; for (const file of files) { diff --git a/scripts/migrate-in-container.sh b/scripts/migrate-in-container.sh index 45659cd..43ca08f 100755 --- a/scripts/migrate-in-container.sh +++ b/scripts/migrate-in-container.sh @@ -9,7 +9,9 @@ for file in \ infra/db/migrations/002_core_extensions.sql \ infra/db/migrations/003_ingest_idempotency.sql \ infra/db/migrations/004_webhook_deliveries.sql \ - infra/db/migrations/005_batches_11_18.sql + infra/db/migrations/005_batches_11_18.sql \ + infra/db/migrations/006_batches_19_26.sql \ + infra/db/migrations/008_replay_v2_storage.sql do echo "Applying $file ..." diff --git a/scripts/run-migrations.sh b/scripts/run-migrations.sh index 26fe66d..d6204c0 100755 --- a/scripts/run-migrations.sh +++ b/scripts/run-migrations.sh @@ -6,3 +6,4 @@ psql "${DATABASE_URL}" -f infra/db/migrations/003_ingest_idempotency.sql psql "${DATABASE_URL}" -f infra/db/migrations/004_webhook_deliveries.sql psql "${DATABASE_URL}" -f infra/db/migrations/005_batches_11_18.sql psql "${DATABASE_URL}" -f infra/db/migrations/006_batches_19_26.sql +psql "${DATABASE_URL}" -f infra/db/migrations/008_replay_v2_storage.sql