Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 172 additions & 12 deletions apps/ingest/src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import crypto from 'node:crypto';
import Fastify from 'fastify';
import pg from 'pg';
import { INGEST_EVENT_TYPES, isValidIngestType } from '@testharbor/shared';
import { INGEST_EVENT_TYPES, assertReplayV2ChunkPayload, isValidIngestType } from '@testharbor/shared';

const app = Fastify({ logger: true });
const port = Number(process.env.PORT || 4010);
Expand Down Expand Up @@ -43,7 +43,8 @@ const REQUIRED_FIELDS_BY_TYPE = {
[INGEST_EVENT_TYPES.SPEC_FINISHED]: ['specRunId', 'status'],
[INGEST_EVENT_TYPES.TEST_RESULT]: ['testResultId', 'specRunId', 'status'],
[INGEST_EVENT_TYPES.ARTIFACT_REGISTERED]: ['artifactId', 'runId', 'type', 'storageKey'],
[INGEST_EVENT_TYPES.HEARTBEAT]: ['runId']
[INGEST_EVENT_TYPES.HEARTBEAT]: ['runId'],
[INGEST_EVENT_TYPES.REPLAY_V2_CHUNK]: ['runId', 'streamId', 'seqStart', 'seqEnd', 'events']
};

function missingKeys(obj, keys) {
Expand All @@ -56,12 +57,30 @@ function validatePayloadShape(type, payload) {
if (missing.length) {
throw new ValidationError('payload_missing_required_fields', { type, missing });
}

if (type === INGEST_EVENT_TYPES.REPLAY_V2_CHUNK) {
try {
assertReplayV2ChunkPayload(payload);
} catch (error) {
throw new ValidationError(error.message, error.details || { type });
}
}
}

async function query(sql, params = []) {
return pool.query(sql, params);
}

async function withTransaction(handler) {
const client = await pool.connect();
try {
return await client.query(sql, params);
await client.query('begin');
const result = await handler(client);
await client.query('commit');
return result;
} catch (error) {
await client.query('rollback');
throw error;
} finally {
client.release();
}
Expand Down Expand Up @@ -225,13 +244,13 @@ async function enqueueWebhooks({ eventType, workspaceId, runId = null, payload }
}
}

async function resolveTestCaseId(payload) {
async function resolveTestCaseId(payload, db = pool) {
if (payload.testCaseId) return payload.testCaseId;
if (!requireKeys(payload, ['projectId', 'stableTestKey', 'title', 'filePath'])) {
throw new Error('test.result requires testCaseId or projectId+stableTestKey+title+filePath');
}

const row = await query(
const row = await db.query(
`insert into test_cases(project_id, stable_test_key, title, file_path, suite_path)
values($1,$2,$3,$4,$5)
on conflict (project_id, stable_test_key)
Expand All @@ -243,13 +262,13 @@ async function resolveTestCaseId(payload) {
return row.rows[0].id;
}

async function lookupRunContextByRunId(runId) {
const res = await query('select id, workspace_id, project_id from runs where id = $1', [runId]);
async function lookupRunContextByRunId(runId, db = pool) {
const res = await db.query('select id, workspace_id, project_id from runs where id = $1', [runId]);
return res.rows[0] || null;
}

async function lookupRunContextBySpecRunId(specRunId) {
const res = await query(
async function lookupRunContextBySpecRunId(specRunId, db = pool) {
const res = await db.query(
`select r.id as run_id, r.workspace_id, r.project_id
from spec_runs s
join runs r on r.id = s.run_id
Expand All @@ -259,7 +278,141 @@ async function lookupRunContextBySpecRunId(specRunId) {
return res.rows[0] || null;
}

async function handleEvent(type, payload) {
async function persistReplayV2Chunk(payload, idempotencyKey) {
await withTransaction(async (db) => {
await db.query('select pg_advisory_xact_lock(hashtext($1), hashtext($2))', [payload.runId, payload.streamId]);

const existingChunk = await db.query(
`select run_id, stream_id, seq_start, seq_end
from replay_v2_chunks
where idempotency_key = $1`,
[idempotencyKey]
);
if (existingChunk.rows.length) return;

if (!(await lookupRunContextByRunId(payload.runId, db))) {
throw new ValidationError('replay_v2_run_not_found', { runId: payload.runId });
}

await db.query(
`insert into replay_v2_streams (
run_id, stream_id, schema_version, started_at, metadata_json,
first_seq, last_seq, chunk_count, event_count, final_received, created_at, updated_at
)
values ($1, $2, $3, $4::timestamptz, $5::jsonb, null, null, 0, 0, false, now(), now())
on conflict (run_id, stream_id)
do update set
schema_version = excluded.schema_version,
started_at = coalesce(replay_v2_streams.started_at, excluded.started_at),
metadata_json = coalesce(replay_v2_streams.metadata_json, excluded.metadata_json),
updated_at = now()`,
[
payload.runId,
payload.streamId,
payload.schemaVersion ?? '2.0',
payload.startedAt ?? null,
JSON.stringify(payload.metadata ?? null)
]
);

const stream = await db.query(
`select first_seq, last_seq, chunk_count, event_count, final_received
from replay_v2_streams
where run_id = $1 and stream_id = $2
for update`,
[payload.runId, payload.streamId]
);

const streamState = stream.rows[0];
const lastSeq = streamState?.last_seq ?? 0;
const expectedSeqStart = lastSeq + 1;

if (payload.seqStart > expectedSeqStart) {
throw new ValidationError('replay_v2_seq_gap_persisted', {
runId: payload.runId,
streamId: payload.streamId,
expectedSeqStart,
actualSeqStart: payload.seqStart,
lastSeq
});
}

if (payload.seqStart < expectedSeqStart) {
if (payload.seqEnd <= lastSeq) return;
throw new ValidationError('replay_v2_seq_overlap_conflict', {
runId: payload.runId,
streamId: payload.streamId,
expectedSeqStart,
actualSeqStart: payload.seqStart,
seqEnd: payload.seqEnd,
lastSeq
});
}

const chunkResult = await db.query(
`insert into replay_v2_chunks (
run_id, stream_id, idempotency_key, schema_version,
seq_start, seq_end, event_count, chunk_index, final, started_at, payload_json
)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::timestamptz, $11::jsonb)
returning id`,
[
payload.runId,
payload.streamId,
idempotencyKey,
payload.schemaVersion ?? '2.0',
payload.seqStart,
payload.seqEnd,
payload.events.length,
payload.chunkIndex ?? null,
payload.final === true,
payload.startedAt ?? null,
JSON.stringify(payload)
]
);
const chunkId = chunkResult.rows[0].id;

const eventValues = [];
const eventPlaceholders = payload.events.map((event, index) => {
const offset = index * 10;
eventValues.push(
payload.runId,
payload.streamId,
event.seq,
event.kind,
event.ts,
event.monotonicMs,
event.targetId ?? null,
JSON.stringify(event.selectorBundle ?? null),
JSON.stringify(event.data ?? null),
chunkId
);
return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}::timestamptz, $${offset + 6}, $${offset + 7}, $${offset + 8}::jsonb, $${offset + 9}::jsonb, $${offset + 10})`;
});

await db.query(
`insert into replay_v2_events (
run_id, stream_id, seq, kind, ts, monotonic_ms, target_id, selector_bundle, data_json, chunk_id
)
values ${eventPlaceholders.join(', ')}`,
eventValues
);

await db.query(
`update replay_v2_streams
set first_seq = coalesce(first_seq, $3),
last_seq = $4,
chunk_count = chunk_count + 1,
event_count = event_count + $5,
final_received = final_received or $6,
updated_at = now()
where run_id = $1 and stream_id = $2`,
[payload.runId, payload.streamId, payload.seqStart, payload.seqEnd, payload.events.length, payload.final === true]
);
});
}

async function handleEvent(type, payload, { idempotencyKey } = {}) {
switch (type) {
case INGEST_EVENT_TYPES.RUN_STARTED: {
if (!requireKeys(payload, ['runId', 'projectId'])) throw new Error('run.started missing required fields');
Expand Down Expand Up @@ -383,6 +536,14 @@ async function handleEvent(type, payload) {
);
return;
}
case INGEST_EVENT_TYPES.REPLAY_V2_CHUNK: {
if (!requireKeys(payload, ['runId', 'streamId', 'seqStart', 'seqEnd', 'events'])) {
throw new Error('replay.v2.chunk missing required fields');
}
assertReplayV2ChunkPayload(payload);
await persistReplayV2Chunk(payload, idempotencyKey);
return;
}
default:
throw new Error(`Unhandled event type: ${type}`);
}
Expand Down Expand Up @@ -417,7 +578,7 @@ app.post('/v1/ingest/events', async (request, reply) => {

try {
const result = await withIdempotency(idempotencyKey, type, payload, async () => {
await handleEvent(type, payload);
await handleEvent(type, payload, { idempotencyKey });
});

if (auth.mode === 'project' && auth.token?.id) {
Expand All @@ -433,7 +594,6 @@ app.post('/v1/ingest/events', async (request, reply) => {
details: error.details
});
}

const msg = String(error?.message || error);
if (msg.includes('missing required fields')) {
return reply.code(400).send({
Expand Down
106 changes: 106 additions & 0 deletions docs/REPLAY_V2_PHASE_A.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Replay V2 Phase A

Replay V2 Phase A defines the reporter-to-ingest wire contract for chunked replay streams. Storage and replay rendering are intentionally out of scope in this phase.

## Chunk Contract

Ingest event type: `replay.v2.chunk`

Required payload fields:

- `runId`: owning run identifier. The ingest service rejects chunks for unknown runs.
- `streamId`: stable identifier for one replay stream within the run.
- `seqStart`: first event sequence number in the chunk.
- `seqEnd`: last event sequence number in the chunk.
- `events`: non-empty ordered array of Replay V2 events.

Optional header fields currently emitted by the reporter:

- `schemaVersion`: current value `2.0`
- `startedAt`: replay stream wall-clock origin timestamp
- `chunkIndex`: zero-based chunk counter
- `final`: marks the terminal flushed chunk

Validation rules:

- `events` must be non-empty.
- Every event must carry the same `runId` and `streamId` as the chunk header.
- Event `seq` values must be strictly contiguous with no gaps.
- `seqStart` must equal the first event sequence.
- `seqEnd` must equal the last event sequence.
- `seqEnd - seqStart + 1` must equal `events.length`.
- Event `monotonicMs` must never move backwards inside a chunk.

## Event Shape

Each event includes:

- `kind`: Replay V2 event kind
- `runId`
- `streamId`
- `seq`
- `monotonicMs`
- `ts`

Optional fields:

- `targetId`
- `selectorBundle`
- `data`

Phase A recognizes these event kinds:

- `session.start`
- `session.end`
- `target.declared`
- `target.rebound`
- `target.orphaned`
- `dom.snapshot`
- `dom.mutation`
- `pointer`
- `keyboard`
- `input`
- `scroll`
- `viewport`
- `navigation`
- `assertion`
- `log`
- `custom`

## Selector Bundles And Stable Target IDs

Selector bundles are normalized before hashing or validation:

- known selector fields are trimmed
- array selectors are deduplicated and sorted
- empty values are removed
- `nth` is preserved only when it is a non-negative integer

Stable target IDs are derived from normalized selector data plus optional target identity hints. This allows the reporter to redeclare the same logical target deterministically across retries or rebinding.

## Target Lifecycle

The Phase A reporter exposes four target lifecycle operations:

1. `declareReplayTarget`
2. `rebindReplayTarget`
3. `markReplayTargetOrphan`
4. `queueReplayEvent` for events that reference an active target

Lifecycle expectations:

- A target is active immediately after declaration.
- Rebinding updates the normalized selector bundle for an existing target and returns it to active state.
- Orphaning marks the target unusable for future non-lifecycle events.
- Events that reference a target must use an active target ID, except for `target.declared` and `target.orphaned` lifecycle events themselves.

## Reporter Behavior

The reporter maintains:

- a monotonic clock anchored to `startedAt`
- a strict event sequence counter beginning at `1`
- a strict chunk continuity tracker across flushes
- a target registry enforcing active versus orphaned usage

When the pending event queue reaches `TESTHARBOR_REPLAY_CHUNK_SIZE` (default `100`), the reporter flushes a validated `replay.v2.chunk` payload to ingest.
26 changes: 26 additions & 0 deletions docs/REPLAY_V2_PHASE_B.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Replay V2 Phase B

Replay V2 Phase B stores validated `replay.v2.chunk` payloads durably in Postgres during ingest.

## What Is Persisted

- `replay_v2_streams` stores one aggregate row per `(run_id, stream_id)` with schema version, stream timestamps, counters, and terminal-state tracking.
- `replay_v2_chunks` stores each accepted chunk header plus the original validated `payload_json`, keyed by ingest `idempotencyKey`.
- `replay_v2_events` stores one ordered row per replay event sequence with normalized selector bundles, event data, and the owning chunk reference.

## Continuity Semantics

- Replay persistence is transactional and serialized per `(run_id, stream_id)`.
- The next accepted chunk must start at `coalesce(last_seq, 0) + 1`.
- If a chunk starts after the expected sequence, ingest rejects it with `replay_v2_seq_gap_persisted`.
- If a chunk starts before the expected sequence and extends past the persisted tail, ingest rejects it with `replay_v2_seq_overlap_conflict`.

## Duplicate Handling

- If a retried chunk is fully covered by the persisted stream tail (`seqEnd <= last_seq`), ingest treats it as a duplicate replay and leaves the replay tables unchanged.
- If the same ingest `idempotencyKey` already exists in `replay_v2_chunks`, persistence is skipped and the existing outer ingest idempotency envelope remains unchanged.
- Accepted chunks still record the normal ingest idempotency outcome in `ingest_events`.

## Phase C

Phase C still needs replay read models and API/web viewer work on top of these persisted tables.
Loading
Loading