Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 265 additions & 0 deletions apps/api/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ function buildPageInfo(total, page, limit) {
};
}

function normalizeOptionalInt(value, { min = null, max = null } = {}) {
if (value === undefined || value === null || value === '') return null;
const parsed = Number(value);
if (!Number.isFinite(parsed)) return null;
const normalized = Math.floor(parsed);
if (min !== null && normalized < min) return null;
if (max !== null && normalized > max) return null;
return normalized;
}

function deterministicUnit(seed, key) {
const hex = hashText(`${seed}:${key}`).slice(0, 12);
return Number.parseInt(hex, 16) / 0xffffffffffff;
Expand Down Expand Up @@ -1795,6 +1805,261 @@ app.get('/v1/runs/:id/summary', { preHandler: workspaceGuard({ role: 'viewer', r
return runBundle;
});

app.get('/v1/runs/:id/replay-v2/streams', { preHandler: workspaceGuard({ role: 'viewer', resolveWorkspaceId: 'runParam' }) }, async (request, reply) => {
const run = await fetchRun(request.params.id);
if (!run) return reply.code(404).send({ error: 'not_found' });

const { rows } = await query(
`select stream_id, schema_version, started_at, first_seq, last_seq, chunk_count,
event_count, final_received, protocol_version, transport_kind, harbor_root,
fin_seq, ack_seq, ack_received, seek_stride, actionable_command_count,
aligned_command_count, target_resolved_count, orphan_count, target_registry_version,
updated_at
from replay_v2_streams
where run_id = $1
order by started_at asc nulls last, stream_id asc`,
[request.params.id]
);

return {
items: rows,
pageInfo: buildPageInfo(rows.length, 1, Math.max(rows.length, 1))
};
});

app.get('/v1/runs/:id/replay-v2/events', { preHandler: workspaceGuard({ role: 'viewer', resolveWorkspaceId: 'runParam' }) }, async (request, reply) => {
const run = await fetchRun(request.params.id);
if (!run) return reply.code(404).send({ error: 'not_found' });

const streamId = String(request.query?.streamId || '').trim();
if (!streamId) return reply.code(400).send({ error: 'stream_id_required' });

const fromSeq = normalizeOptionalInt(request.query?.fromSeq, { min: 1 });
const toSeq = normalizeOptionalInt(request.query?.toSeq, { min: 1 });
const parsedLimit = normalizeOptionalInt(request.query?.limit, { min: 1, max: 1000 });
const normalizedLimit = parsedLimit ?? 300;

if ((request.query?.fromSeq ?? '') !== '' && fromSeq === null) {
return reply.code(400).send({ error: 'invalid_from_seq' });
}
if ((request.query?.toSeq ?? '') !== '' && toSeq === null) {
return reply.code(400).send({ error: 'invalid_to_seq' });
}
if ((request.query?.limit ?? '') !== '' && parsedLimit === null) {
return reply.code(400).send({ error: 'invalid_limit' });
}
if (fromSeq !== null && toSeq !== null && fromSeq > toSeq) {
return reply.code(400).send({ error: 'invalid_seq_range' });
}

const streamRes = await query(
`select stream_id
from replay_v2_streams
where run_id = $1 and stream_id = $2`,
[request.params.id, streamId]
);
if (!streamRes.rows.length) return reply.code(404).send({ error: 'not_found' });

const totalRes = await query(
`select count(*)::int as total
from replay_v2_events
where run_id = $1
and stream_id = $2
and ($3::int is null or seq >= $3)
and ($4::int is null or seq <= $4)`,
[request.params.id, streamId, fromSeq, toSeq]
);

const { rows } = await query(
`select e.seq, e.kind, e.ts, e.monotonic_ms, e.target_id, e.selector_bundle,
e.data_json, e.chunk_id, c.chunk_index, c.final, e.command_id, e.target_ref,
e.payload_json, e.lifecycle_event, e.selector_version, e.dom_signature_hash, e.asset_refs
from replay_v2_events e
left join replay_v2_chunks c on c.id = e.chunk_id
where e.run_id = $1
and e.stream_id = $2
and ($3::int is null or e.seq >= $3)
and ($4::int is null or e.seq <= $4)
order by e.seq asc
limit $5`,
[request.params.id, streamId, fromSeq, toSeq, normalizedLimit]
);

return {
items: rows,
pageInfo: {
...buildPageInfo(totalRes.rows[0]?.total, 1, normalizedLimit),
streamId,
fromSeq,
toSeq
}
};
});

app.get('/v1/runs/:id/replay-v2/targets', { preHandler: workspaceGuard({ role: 'viewer', resolveWorkspaceId: 'runParam' }) }, async (request, reply) => {
const run = await fetchRun(request.params.id);
if (!run) return reply.code(404).send({ error: 'not_found' });

const streamId = String(request.query?.streamId || '').trim();
if (!streamId) return reply.code(400).send({ error: 'stream_id_required' });

const seq = normalizeOptionalInt(request.query?.seq, { min: 1 }) ?? 2147483647;
const { rows } = await query(
`select distinct on (target_id)
target_id, selector_version, state, event_seq, lifecycle_event, selector_bundle, metadata_json, dom_signature_hash, created_at
from replay_v2_target_registry
where run_id = $1
and stream_id = $2
and event_seq <= $3
order by target_id, event_seq desc`,
[request.params.id, streamId, seq]
);

return {
items: rows,
pageInfo: {
...buildPageInfo(rows.length, 1, Math.max(rows.length, 1)),
streamId,
seq: Number.isFinite(seq) ? seq : null
}
};
});

app.get('/v1/runs/:id/replay-v2/seek', { preHandler: workspaceGuard({ role: 'viewer', resolveWorkspaceId: 'runParam' }) }, async (request, reply) => {
const run = await fetchRun(request.params.id);
if (!run) return reply.code(404).send({ error: 'not_found' });

const streamId = String(request.query?.streamId || '').trim();
if (!streamId) return reply.code(400).send({ error: 'stream_id_required' });

const seq = normalizeOptionalInt(request.query?.seq, { min: 1 });
if (seq === null) return reply.code(400).send({ error: 'invalid_seq' });

const streamRes = await query(
`select stream_id, first_seq, last_seq, seek_stride
from replay_v2_streams
where run_id = $1 and stream_id = $2`,
[request.params.id, streamId]
);
const stream = streamRes.rows[0];
if (!stream) return reply.code(404).send({ error: 'not_found' });

const checkpointRes = await query(
`select checkpoint_seq, event_seq, monotonic_ms, target_registry_state_json
from replay_v2_seek_index
where run_id = $1 and stream_id = $2 and checkpoint_seq <= $3
order by checkpoint_seq desc
limit 1`,
[request.params.id, streamId, seq]
);
const checkpoint = checkpointRes.rows[0] || null;
const deltaStart = checkpoint ? checkpoint.event_seq + 1 : Math.max(1, stream.first_seq || 1);

const deltasRes = await query(
`select seq, kind, ts, monotonic_ms, target_id, command_id, target_ref, selector_bundle,
payload_json, lifecycle_event, selector_version, dom_signature_hash, asset_refs
from replay_v2_events
where run_id = $1
and stream_id = $2
and seq >= $3
and seq <= $4
order by seq asc`,
[request.params.id, streamId, deltaStart, seq]
);

const targetsRes = await query(
`select distinct on (target_id)
target_id, selector_version, state, event_seq, lifecycle_event, selector_bundle, metadata_json, dom_signature_hash
from replay_v2_target_registry
where run_id = $1
and stream_id = $2
and event_seq <= $3
order by target_id, event_seq desc`,
[request.params.id, streamId, seq]
);

const inspectEventRes = await query(
`select seq, target_id, target_ref, selector_bundle, dom_signature_hash, payload_json
from replay_v2_events
where run_id = $1
and stream_id = $2
and seq <= $3
and target_id is not null
order by seq desc
limit 1`,
[request.params.id, streamId, seq]
);
const inspectEvent = inspectEventRes.rows[0] || null;

return {
item: {
streamId,
seq,
checkpoint,
deltas: deltasRes.rows,
resolvedTargets: targetsRes.rows,
liveInspect: inspectEvent ? {
seq: inspectEvent.seq,
targetId: inspectEvent.target_id,
selectorBundle: inspectEvent.selector_bundle,
domSignatureHash: inspectEvent.dom_signature_hash,
payload: inspectEvent.payload_json
} : null
}
};
});

app.get('/v1/runs/:id/replay-v2/metrics', { preHandler: workspaceGuard({ role: 'viewer', resolveWorkspaceId: 'runParam' }) }, async (request, reply) => {
const run = await fetchRun(request.params.id);
if (!run) return reply.code(404).send({ error: 'not_found' });

const streamId = String(request.query?.streamId || '').trim();
if (!streamId) return reply.code(400).send({ error: 'stream_id_required' });

const { rows } = await query(
`select stream_id, first_seq, last_seq, event_count,
actionable_command_count, aligned_command_count, target_resolved_count,
orphan_count, final_received, ack_received, fin_seq, ack_seq, seek_stride, updated_at
from replay_v2_streams
where run_id = $1 and stream_id = $2`,
[request.params.id, streamId]
);

const { rows: seqRows } = await query(
`select min(seq) as min_seq, max(seq) as max_seq, count(*) as row_count
from replay_v2_events
where run_id = $1 and stream_id = $2`,
[request.params.id, streamId]
);
const stream = rows[0];
if (!stream) return reply.code(404).send({ error: 'not_found' });

const actionable = Number(stream.actionable_command_count || 0);
const alignment = actionable > 0 ? Number(stream.aligned_command_count || 0) / actionable : 1;
const targetStability = actionable > 0 ? Number(stream.target_resolved_count || 0) / actionable : 1;

const seqRow = seqRows?.[0] || {};
const minSeq = seqRow.min_seq !== null ? Number(seqRow.min_seq) : null;
const maxSeq = seqRow.max_seq !== null ? Number(seqRow.max_seq) : null;
const rowCount = Number(seqRow.row_count || 0);
const spanCount = minSeq === null || maxSeq === null ? 0 : (maxSeq - minSeq + 1);
const gapCount = Math.max(0, spanCount - rowCount);

return {
item: {
...stream,
seqContinuity: {
zeroGaps: gapCount === 0,
gapCount
},
finAckSuccess: Boolean(stream.final_received && stream.ack_received),
commandToDomAlignment: alignment,
targetStability,
orphanSpamRisk: Number(stream.orphan_count || 0) > Math.max(1, Math.floor(Number(stream.event_count || 0) * 0.01))
}
};
});

app.get('/v1/runs/:runId/specs', { preHandler: workspaceGuard({ role: 'viewer', resolveWorkspaceId: async (request) => resolveWorkspaceIdFromRequestPart(request, 'runParam') }) }, async (request, reply) => {
const { runId } = request.params;
const { status = null, page = 1, limit = 50 } = request.query || {};
Expand Down
Loading
Loading