diff --git a/.env.example b/.env.example index 3c7d312..3265c43 100644 --- a/.env.example +++ b/.env.example @@ -37,11 +37,15 @@ STREAM_IDLE_TIMEOUT_MS=120000 STREAM_MAX_RETRIES=5 STREAM_BACKOFF_BASE_MS=250 STREAM_BACKOFF_MAX_MS=30000 +STREAM_SSE_HEARTBEAT_MS=15000 # ── Replay ─────────────────────────────────────── REPLAY_MAX_DELAY_MS=2000 REPLAY_BATCH_SIZE=500 +# ── Run Recovery ──────────────────────────────── +RUN_RECOVERY_ENABLED=true + # ── Logging ────────────────────────────────────── LOG_LEVEL=info diff --git a/drizzle/0003_v2_progress_and_export.sql b/drizzle/0003_v2_progress_and_export.sql new file mode 100644 index 0000000..bbe5e2e --- /dev/null +++ b/drizzle/0003_v2_progress_and_export.sql @@ -0,0 +1,3 @@ +ALTER TABLE "run_projections" ADD COLUMN IF NOT EXISTS "progress" jsonb NOT NULL DEFAULT '{"entries":[]}'; +CREATE INDEX IF NOT EXISTS run_events_raw_run_ts_idx ON run_events_raw(run_id, ts); +CREATE INDEX IF NOT EXISTS run_events_raw_run_seq_idx ON run_events_raw(run_id, seq); diff --git a/package.json b/package.json index c0f1307..3078815 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "macp-control-plane", - "version": "0.1.0", + "version": "0.2.0", "private": true, "description": "Scenario-agnostic control plane for the MACP runtime, built with NestJS.", "license": "MIT", diff --git a/src/app.module.ts b/src/app.module.ts index c8ad49d..fd5ec59 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -8,6 +8,7 @@ import { ConfigModule } from './config/config.module'; import { HealthController } from './controllers/health.controller'; import { MetricsController } from './controllers/metrics.controller'; import { ObservabilityController } from './controllers/observability.controller'; +import { RunInsightsController } from './controllers/run-insights.controller'; import { RunsController } from './controllers/runs.controller'; import { RuntimeController } from './controllers/runtime.controller'; import { DatabaseModule } from './db/database.module'; @@ -33,8 +34,10 @@ import { RunRepository } from './storage/run.repository'; import { RuntimeSessionRepository } from './storage/runtime-session.repository'; import { InstrumentationService } from './telemetry/instrumentation.service'; import { TraceService } from './telemetry/trace.service'; +import { RunInsightsService } from './insights/run-insights.service'; import { RunExecutorService } from './runs/run-executor.service'; import { RunManagerService } from './runs/run-manager.service'; +import { RunRecoveryService } from './runs/run-recovery.service'; import { StreamConsumerService } from './runs/stream-consumer.service'; @Module({ @@ -44,7 +47,7 @@ import { StreamConsumerService } from './runs/stream-consumer.service'; AuthModule, ThrottlerModule.forRoot([{ ttl: 60000, limit: 100 }]) ], - controllers: [RunsController, RuntimeController, ObservabilityController, HealthController, MetricsController], + controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController], providers: [ { provide: APP_GUARD, useClass: AuthGuard }, { provide: APP_GUARD, useClass: ThrottleByUserGuard }, @@ -70,7 +73,9 @@ import { StreamConsumerService } from './runs/stream-consumer.service'; ReplayService, RunManagerService, StreamConsumerService, - RunExecutorService + RunExecutorService, + RunRecoveryService, + RunInsightsService ] }) export class AppModule implements NestModule { diff --git a/src/config/app-config.service.ts b/src/config/app-config.service.ts index 9921f33..ff1fc20 100644 --- a/src/config/app-config.service.ts +++ b/src/config/app-config.service.ts @@ -59,8 +59,10 @@ export class AppConfigService implements OnModuleInit { readonly streamMaxRetries = readNumber('STREAM_MAX_RETRIES', 5); readonly streamBackoffBaseMs = readNumber('STREAM_BACKOFF_BASE_MS', 250); readonly streamBackoffMaxMs = readNumber('STREAM_BACKOFF_MAX_MS', 30000); + readonly streamSseHeartbeatMs = readNumber('STREAM_SSE_HEARTBEAT_MS', 15000); readonly replayMaxDelayMs = readNumber('REPLAY_MAX_DELAY_MS', 2000); readonly replayBatchSize = readNumber('REPLAY_BATCH_SIZE', 500); + readonly runRecoveryEnabled = readBoolean('RUN_RECOVERY_ENABLED', true); readonly dbPoolMax = readNumber('DB_POOL_MAX', 20); readonly dbPoolIdleTimeout = readNumber('DB_POOL_IDLE_TIMEOUT', 30000); diff --git a/src/contracts/control-plane.ts b/src/contracts/control-plane.ts index 5028393..95df6db 100644 --- a/src/contracts/control-plane.ts +++ b/src/contracts/control-plane.ts @@ -254,6 +254,39 @@ export interface ReplayRequest { toSeq?: number; } +export interface RunExportBundle { + run: Run; + session: Record | null; + projection: RunStateProjection | null; + metrics: MetricsSummary | null; + artifacts: Artifact[]; + canonicalEvents: CanonicalEvent[]; + rawEvents: Record[]; + exportedAt: string; +} + +export interface RunComparisonRequest { + leftRunId: string; + rightRunId: string; +} + +export interface RunComparisonResult { + left: { runId: string; status: RunStatus; modeName?: string; durationMs?: number }; + right: { runId: string; status: RunStatus; modeName?: string; durationMs?: number }; + statusMatch: boolean; + durationDeltaMs?: number; + confidenceDelta?: number; + participantsDiff: { + added: string[]; + removed: string[]; + common: string[]; + }; + signalsDiff: { + added: string[]; + removed: string[]; + }; +} + export interface MetricsSummary { runId: string; eventCount: number; diff --git a/src/controllers/run-insights.controller.spec.ts b/src/controllers/run-insights.controller.spec.ts new file mode 100644 index 0000000..e85da26 --- /dev/null +++ b/src/controllers/run-insights.controller.spec.ts @@ -0,0 +1,62 @@ +import { RunInsightsController } from './run-insights.controller'; +import { RunInsightsService } from '../insights/run-insights.service'; + +describe('RunInsightsController', () => { + let controller: RunInsightsController; + let mockInsightsService: { + exportRun: jest.Mock; + compareRuns: jest.Mock; + }; + + beforeEach(() => { + mockInsightsService = { + exportRun: jest.fn(), + compareRuns: jest.fn() + }; + controller = new RunInsightsController( + mockInsightsService as unknown as RunInsightsService + ); + }); + + describe('exportRun', () => { + it('delegates to insightsService.exportRun with options', async () => { + const bundle = { run: { id: 'run-1' }, exportedAt: '2026-01-01T00:00:00Z' }; + mockInsightsService.exportRun.mockResolvedValue(bundle); + + const query = { includeCanonical: true, includeRaw: false, eventLimit: 500 }; + const result = await controller.exportRun('run-1', query as any); + + expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', { + includeCanonical: true, + includeRaw: false, + eventLimit: 500 + }); + expect(result).toEqual(bundle); + }); + + it('passes undefined options when query is empty', async () => { + mockInsightsService.exportRun.mockResolvedValue({}); + + await controller.exportRun('run-1', {} as any); + + expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', { + includeCanonical: undefined, + includeRaw: undefined, + eventLimit: undefined + }); + }); + }); + + describe('compareRuns', () => { + it('delegates to insightsService.compareRuns', async () => { + const comparison = { statusMatch: true, left: {}, right: {} }; + mockInsightsService.compareRuns.mockResolvedValue(comparison); + + const body = { leftRunId: 'run-1', rightRunId: 'run-2' }; + const result = await controller.compareRuns(body as any); + + expect(mockInsightsService.compareRuns).toHaveBeenCalledWith('run-1', 'run-2'); + expect(result).toEqual(comparison); + }); + }); +}); diff --git a/src/controllers/run-insights.controller.ts b/src/controllers/run-insights.controller.ts new file mode 100644 index 0000000..ce4a6d1 --- /dev/null +++ b/src/controllers/run-insights.controller.ts @@ -0,0 +1,45 @@ +import { + Body, + Controller, + Get, + Param, + ParseUUIDPipe, + Post, + Query, + ValidationPipe +} from '@nestjs/common'; +import { ApiBody, ApiOkResponse, ApiOperation, ApiTags } from '@nestjs/swagger'; +import { CompareRunsDto } from '../dto/compare-runs.dto'; +import { ExportRunQueryDto } from '../dto/export-run-query.dto'; +import { RunBundleExportDto, RunComparisonResultDto } from '../dto/run-responses.dto'; +import { RunInsightsService } from '../insights/run-insights.service'; + +@ApiTags('runs') +@Controller('runs') +export class RunInsightsController { + constructor(private readonly insightsService: RunInsightsService) {} + + @Get(':id/export') + @ApiOperation({ summary: 'Export a full run bundle (run, session, projection, events, artifacts).' }) + @ApiOkResponse({ type: RunBundleExportDto }) + async exportRun( + @Param('id', new ParseUUIDPipe()) id: string, + @Query(new ValidationPipe({ transform: true, whitelist: true })) query: ExportRunQueryDto + ) { + return this.insightsService.exportRun(id, { + includeCanonical: query.includeCanonical, + includeRaw: query.includeRaw, + eventLimit: query.eventLimit + }); + } + + @Post('compare') + @ApiOperation({ summary: 'Compare two runs side-by-side.' }) + @ApiBody({ type: CompareRunsDto }) + @ApiOkResponse({ type: RunComparisonResultDto }) + async compareRuns( + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: CompareRunsDto + ) { + return this.insightsService.compareRuns(body.leftRunId, body.rightRunId); + } +} diff --git a/src/controllers/runs.controller.spec.ts b/src/controllers/runs.controller.spec.ts index 56c1ff4..186ca39 100644 --- a/src/controllers/runs.controller.spec.ts +++ b/src/controllers/runs.controller.spec.ts @@ -4,6 +4,7 @@ import { RunManagerService } from '../runs/run-manager.service'; import { EventRepository } from '../storage/event.repository'; import { ReplayService } from '../replay/replay.service'; import { StreamHubService } from '../events/stream-hub.service'; +import { AppConfigService } from '../config/app-config.service'; describe('RunsController', () => { let controller: RunsController; @@ -23,6 +24,7 @@ describe('RunsController', () => { }; let mockReplayService: Partial; let mockStreamHub: Partial; + let mockConfig: Partial; beforeEach(() => { mockRunExecutor = { @@ -41,6 +43,9 @@ describe('RunsController', () => { }; mockReplayService = {}; mockStreamHub = {}; + mockConfig = { + streamSseHeartbeatMs: 15000, + }; controller = new RunsController( mockRunExecutor as unknown as RunExecutorService, @@ -48,6 +53,7 @@ describe('RunsController', () => { mockEventRepository as unknown as EventRepository, mockReplayService as unknown as ReplayService, mockStreamHub as unknown as StreamHubService, + mockConfig as unknown as AppConfigService, ); }); diff --git a/src/controllers/runs.controller.ts b/src/controllers/runs.controller.ts index a09039a..15f70f6 100644 --- a/src/controllers/runs.controller.ts +++ b/src/controllers/runs.controller.ts @@ -2,6 +2,7 @@ import { Body, Controller, Get, + Headers, MessageEvent, Param, ParseUUIDPipe, @@ -18,13 +19,15 @@ import { ApiQuery, ApiTags } from '@nestjs/swagger'; -import { concat, from, map, Observable } from 'rxjs'; -import { ReplayRequest, RunStatus } from '../contracts/control-plane'; +import { map, Observable } from 'rxjs'; +import { CanonicalEvent, ReplayRequest, RunStatus } from '../contracts/control-plane'; +import { AppConfigService } from '../config/app-config.service'; import { ExecutionRequestDto } from '../dto/execution-request.dto'; import { ListEventsQueryDto } from '../dto/list-events-query.dto'; import { ListRunsQueryDto } from '../dto/list-runs-query.dto'; import { ReplayRequestDto } from '../dto/replay-request.dto'; import { SendSignalDto } from '../dto/send-signal.dto'; +import { StreamRunQueryDto } from '../dto/stream-run-query.dto'; import { UpdateContextDto } from '../dto/update-context.dto'; import { CanonicalEventDto, @@ -32,7 +35,7 @@ import { ReplayDescriptorDto, RunStateResponseDto } from '../dto/run-responses.dto'; -import { StreamHubService } from '../events/stream-hub.service'; +import { StreamHubService, StreamHubMessage } from '../events/stream-hub.service'; import { ReplayService } from '../replay/replay.service'; import { EventRepository } from '../storage/event.repository'; import { RunExecutorService } from '../runs/run-executor.service'; @@ -46,7 +49,8 @@ export class RunsController { private readonly runManager: RunManagerService, private readonly eventRepository: EventRepository, private readonly replayService: ReplayService, - private readonly streamHub: StreamHubService + private readonly streamHub: StreamHubService, + private readonly config: AppConfigService ) {} @Get() @@ -105,15 +109,103 @@ export class RunsController { } @Sse(':id/stream') - @ApiOperation({ summary: 'Subscribe to normalized live run events over SSE.' }) - streamRun(@Param('id', new ParseUUIDPipe()) id: string): Observable { - const initial$ = from(this.runManager.getState(id)).pipe( - map((state) => ({ type: 'snapshot', data: state }) as MessageEvent) - ); - const live$ = this.streamHub.stream(id).pipe( - map((item) => ({ type: item.event, data: item.data }) as MessageEvent) - ); - return concat(initial$, live$); + @ApiOperation({ summary: 'Subscribe to normalized live run events over SSE with resume support.' }) + streamRun( + @Param('id', new ParseUUIDPipe()) id: string, + @Query(new ValidationPipe({ transform: true, whitelist: true })) query: StreamRunQueryDto, + @Headers('last-event-id') lastEventId?: string + ): Observable { + const afterSeq = query.afterSeq ?? (lastEventId ? Number(lastEventId) : 0); + const includeSnapshot = query.includeSnapshot !== false; + const heartbeatMs = query.heartbeatMs ?? this.config.streamSseHeartbeatMs; + + return new Observable((subscriber) => { + const buffer: StreamHubMessage[] = []; + let backfillDone = false; + let highSeq = afterSeq; + + // 1. Subscribe to live hub immediately, buffer during backfill + const liveSub = this.streamHub.stream(id).subscribe({ + next: (msg) => { + if (!backfillDone) { + buffer.push(msg); + return; + } + const seq = (msg.data as CanonicalEvent)?.seq; + if (seq !== undefined && seq <= highSeq) return; + if (seq !== undefined) highSeq = seq; + subscriber.next({ + type: msg.event, + data: msg.data, + ...(seq !== undefined ? { id: String(seq) } : {}) + } as MessageEvent); + }, + complete: () => subscriber.complete(), + error: (err) => subscriber.error(err) + }); + + // 2. Heartbeat + const heartbeatTimer = setInterval(() => { + subscriber.next({ type: 'heartbeat', data: { ts: new Date().toISOString() } } as MessageEvent); + }, heartbeatMs); + if (typeof heartbeatTimer === 'object' && 'unref' in heartbeatTimer) { + heartbeatTimer.unref(); + } + + // 3. Backfill + drain buffer + const runBackfill = async () => { + try { + // Emit snapshot if requested + if (includeSnapshot) { + const state = await this.runManager.getState(id); + subscriber.next({ type: 'snapshot', data: state } as MessageEvent); + } + + // Backfill missed canonical events in batches + if (afterSeq > 0) { + let cursor = afterSeq; + const batchSize = 500; + while (true) { + const events = await this.eventRepository.listCanonicalByRun(id, cursor, batchSize); + for (const event of events) { + if (event.seq <= highSeq) continue; + highSeq = event.seq; + subscriber.next({ + type: 'canonical_event', + data: event, + id: String(event.seq) + } as MessageEvent); + } + if (events.length < batchSize) break; + cursor = events[events.length - 1].seq; + } + } + + // Drain buffer, deduplicating by seq + backfillDone = true; + for (const msg of buffer) { + const seq = (msg.data as CanonicalEvent)?.seq; + if (seq !== undefined && seq <= highSeq) continue; + if (seq !== undefined) highSeq = seq; + subscriber.next({ + type: msg.event, + data: msg.data, + ...(seq !== undefined ? { id: String(seq) } : {}) + } as MessageEvent); + } + buffer.length = 0; + } catch (err) { + subscriber.error(err); + } + }; + + void runBackfill(); + + return () => { + clearInterval(heartbeatTimer); + liveSub.unsubscribe(); + }; + }); } @Post(':id/cancel') diff --git a/src/db/schema.ts b/src/db/schema.ts index c9108a4..7e2e299 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -123,6 +123,7 @@ export const runProjections = pgTable( signals: jsonb('signals').$type>().notNull().default({ signals: [] }), timeline: jsonb('timeline').$type>().notNull().default({ latestSeq: 0, totalEvents: 0, recent: [] }), traceSummary: jsonb('trace_summary').$type>().notNull().default({ spanCount: 0, linkedArtifacts: [] }), + progress: jsonb('progress').$type>().notNull().default({ entries: [] }), updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() }, (table) => ({ diff --git a/src/dto/compare-runs.dto.ts b/src/dto/compare-runs.dto.ts new file mode 100644 index 0000000..af31dad --- /dev/null +++ b/src/dto/compare-runs.dto.ts @@ -0,0 +1,12 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { IsUUID } from 'class-validator'; + +export class CompareRunsDto { + @ApiProperty({ description: 'UUID of the left run to compare.' }) + @IsUUID() + leftRunId!: string; + + @ApiProperty({ description: 'UUID of the right run to compare.' }) + @IsUUID() + rightRunId!: string; +} diff --git a/src/dto/export-run-query.dto.ts b/src/dto/export-run-query.dto.ts new file mode 100644 index 0000000..e701cc7 --- /dev/null +++ b/src/dto/export-run-query.dto.ts @@ -0,0 +1,25 @@ +import { ApiPropertyOptional } from '@nestjs/swagger'; +import { IsBoolean, IsInt, IsOptional, Max, Min } from 'class-validator'; +import { Transform } from 'class-transformer'; + +export class ExportRunQueryDto { + @ApiPropertyOptional({ description: 'Include canonical events in the export.', default: true }) + @IsOptional() + @IsBoolean() + @Transform(({ value }) => value === 'true' || value === true) + includeCanonical?: boolean; + + @ApiPropertyOptional({ description: 'Include raw events in the export.', default: false }) + @IsOptional() + @IsBoolean() + @Transform(({ value }) => value === 'true' || value === true) + includeRaw?: boolean; + + @ApiPropertyOptional({ description: 'Maximum number of events to include.', default: 10000 }) + @IsOptional() + @IsInt() + @Min(1) + @Max(50000) + @Transform(({ value }) => (value !== undefined ? Number(value) : undefined)) + eventLimit?: number; +} diff --git a/src/dto/run-responses.dto.ts b/src/dto/run-responses.dto.ts index 4535f9e..c8ff62d 100644 --- a/src/dto/run-responses.dto.ts +++ b/src/dto/run-responses.dto.ts @@ -1,5 +1,5 @@ import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; -import { CanonicalEvent, MetricsSummary, RunStateProjection, RunStatus } from '../contracts/control-plane'; +import { CanonicalEvent, MetricsSummary, RunComparisonResult, RunExportBundle, RunStateProjection, RunStatus } from '../contracts/control-plane'; export class CreateRunResponseDto { @ApiProperty() @@ -138,3 +138,52 @@ export class CanonicalEventDto implements CanonicalEvent { @ApiProperty({ type: 'object', additionalProperties: true }) data!: Record; } + +export class RunBundleExportDto implements RunExportBundle { + @ApiProperty({ type: 'object', additionalProperties: true }) + run!: RunExportBundle['run']; + + @ApiPropertyOptional({ type: 'object', additionalProperties: true }) + session!: RunExportBundle['session']; + + @ApiPropertyOptional({ type: 'object', additionalProperties: true }) + projection!: RunExportBundle['projection']; + + @ApiPropertyOptional({ type: 'object', additionalProperties: true }) + metrics!: RunExportBundle['metrics']; + + @ApiProperty({ type: 'array', items: { type: 'object', additionalProperties: true } }) + artifacts!: RunExportBundle['artifacts']; + + @ApiProperty({ type: 'array', items: { type: 'object', additionalProperties: true } }) + canonicalEvents!: RunExportBundle['canonicalEvents']; + + @ApiProperty({ type: 'array', items: { type: 'object', additionalProperties: true } }) + rawEvents!: RunExportBundle['rawEvents']; + + @ApiProperty() + exportedAt!: string; +} + +export class RunComparisonResultDto implements RunComparisonResult { + @ApiProperty({ type: 'object', additionalProperties: true }) + left!: RunComparisonResult['left']; + + @ApiProperty({ type: 'object', additionalProperties: true }) + right!: RunComparisonResult['right']; + + @ApiProperty() + statusMatch!: boolean; + + @ApiPropertyOptional() + durationDeltaMs?: number; + + @ApiPropertyOptional() + confidenceDelta?: number; + + @ApiProperty({ type: 'object', additionalProperties: true }) + participantsDiff!: RunComparisonResult['participantsDiff']; + + @ApiProperty({ type: 'object', additionalProperties: true }) + signalsDiff!: RunComparisonResult['signalsDiff']; +} diff --git a/src/dto/stream-run-query.dto.ts b/src/dto/stream-run-query.dto.ts new file mode 100644 index 0000000..b99cd59 --- /dev/null +++ b/src/dto/stream-run-query.dto.ts @@ -0,0 +1,25 @@ +import { ApiPropertyOptional } from '@nestjs/swagger'; +import { IsBoolean, IsInt, IsOptional, Min } from 'class-validator'; +import { Transform } from 'class-transformer'; + +export class StreamRunQueryDto { + @ApiPropertyOptional({ description: 'Resume from this sequence number (exclusive).' }) + @IsOptional() + @IsInt() + @Min(0) + @Transform(({ value }) => (value !== undefined ? Number(value) : undefined)) + afterSeq?: number; + + @ApiPropertyOptional({ description: 'Include a snapshot frame before events.', default: true }) + @IsOptional() + @IsBoolean() + @Transform(({ value }) => value === 'true' || value === true) + includeSnapshot?: boolean; + + @ApiPropertyOptional({ description: 'Heartbeat interval in milliseconds.' }) + @IsOptional() + @IsInt() + @Min(1000) + @Transform(({ value }) => (value !== undefined ? Number(value) : undefined)) + heartbeatMs?: number; +} diff --git a/src/insights/run-insights.service.spec.ts b/src/insights/run-insights.service.spec.ts new file mode 100644 index 0000000..3d72b92 --- /dev/null +++ b/src/insights/run-insights.service.spec.ts @@ -0,0 +1,169 @@ +import { NotFoundException } from '@nestjs/common'; +import { RunInsightsService } from './run-insights.service'; +import { ProjectionService } from '../projection/projection.service'; +import { ArtifactRepository } from '../storage/artifact.repository'; +import { EventRepository } from '../storage/event.repository'; +import { MetricsRepository } from '../storage/metrics.repository'; +import { RunRepository } from '../storage/run.repository'; +import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; + +describe('RunInsightsService', () => { + let service: RunInsightsService; + let mockRunRepo: { findById: jest.Mock }; + let mockSessionRepo: { findByRunId: jest.Mock }; + let mockProjectionService: { get: jest.Mock }; + let mockEventRepo: { listCanonicalByRun: jest.Mock; listRawByRun: jest.Mock }; + let mockMetricsRepo: { get: jest.Mock }; + let mockArtifactRepo: { listByRunId: jest.Mock }; + + const fakeRun = { + id: 'run-1', + status: 'completed', + runtimeKind: 'rust', + runtimeVersion: '1.0', + runtimeSessionId: 'sess-1', + traceId: 'trace-1', + createdAt: '2026-01-01T00:00:00Z', + startedAt: '2026-01-01T00:00:01Z', + endedAt: '2026-01-01T00:01:00Z', + mode: 'live', + tags: ['demo'], + sourceKind: null, + sourceRef: null, + metadata: {}, + idempotencyKey: null, + lastEventSeq: 10, + errorCode: null, + errorMessage: null, + updatedAt: '2026-01-01T00:01:00Z' + }; + + beforeEach(() => { + mockRunRepo = { findById: jest.fn() }; + mockSessionRepo = { findByRunId: jest.fn().mockResolvedValue(null) }; + mockProjectionService = { get: jest.fn().mockResolvedValue(null) }; + mockEventRepo = { + listCanonicalByRun: jest.fn().mockResolvedValue([]), + listRawByRun: jest.fn().mockResolvedValue([]) + }; + mockMetricsRepo = { get: jest.fn().mockResolvedValue(null) }; + mockArtifactRepo = { listByRunId: jest.fn().mockResolvedValue([]) }; + + service = new RunInsightsService( + mockRunRepo as unknown as RunRepository, + mockSessionRepo as unknown as RuntimeSessionRepository, + mockProjectionService as unknown as ProjectionService, + mockEventRepo as unknown as EventRepository, + mockMetricsRepo as unknown as MetricsRepository, + mockArtifactRepo as unknown as ArtifactRepository + ); + }); + + // ========================================================================= + // exportRun + // ========================================================================= + describe('exportRun', () => { + it('throws NotFoundException for missing run', async () => { + mockRunRepo.findById.mockResolvedValue(null); + await expect(service.exportRun('run-missing', {})).rejects.toThrow(NotFoundException); + }); + + it('returns a full export bundle with canonical events by default', async () => { + mockRunRepo.findById.mockResolvedValue(fakeRun); + + const bundle = await service.exportRun('run-1', {}); + + expect(bundle.run.id).toBe('run-1'); + expect(bundle.exportedAt).toBeDefined(); + expect(mockEventRepo.listCanonicalByRun).toHaveBeenCalledWith('run-1', 0, 10000); + expect(mockEventRepo.listRawByRun).not.toHaveBeenCalled(); + }); + + it('includes raw events when requested', async () => { + mockRunRepo.findById.mockResolvedValue(fakeRun); + + await service.exportRun('run-1', { includeRaw: true }); + + expect(mockEventRepo.listRawByRun).toHaveBeenCalledWith('run-1', 0, 10000); + }); + + it('skips canonical events when includeCanonical is false', async () => { + mockRunRepo.findById.mockResolvedValue(fakeRun); + + await service.exportRun('run-1', { includeCanonical: false }); + + expect(mockEventRepo.listCanonicalByRun).not.toHaveBeenCalled(); + }); + + it('respects eventLimit', async () => { + mockRunRepo.findById.mockResolvedValue(fakeRun); + + await service.exportRun('run-1', { eventLimit: 50 }); + + expect(mockEventRepo.listCanonicalByRun).toHaveBeenCalledWith('run-1', 0, 50); + }); + }); + + // ========================================================================= + // compareRuns + // ========================================================================= + describe('compareRuns', () => { + it('throws NotFoundException when left run is missing', async () => { + mockRunRepo.findById.mockResolvedValueOnce(null); + await expect(service.compareRuns('missing', 'run-2')).rejects.toThrow(NotFoundException); + }); + + it('throws NotFoundException when right run is missing', async () => { + mockRunRepo.findById.mockResolvedValueOnce(fakeRun).mockResolvedValueOnce(null); + await expect(service.compareRuns('run-1', 'missing')).rejects.toThrow(NotFoundException); + }); + + it('returns comparison result with matching statuses', async () => { + mockRunRepo.findById.mockResolvedValue(fakeRun); + + const leftProjection = { + run: { runId: 'run-1', status: 'completed', modeName: 'decision' }, + participants: [{ participantId: 'agent-1' }, { participantId: 'agent-2' }], + signals: { signals: [{ name: 'alert' }] }, + decision: { current: { confidence: 0.9 } } + }; + const rightProjection = { + run: { runId: 'run-2', status: 'completed', modeName: 'decision' }, + participants: [{ participantId: 'agent-1' }, { participantId: 'agent-3' }], + signals: { signals: [{ name: 'alert' }, { name: 'warning' }] }, + decision: { current: { confidence: 0.8 } } + }; + + mockProjectionService.get + .mockResolvedValueOnce(leftProjection) + .mockResolvedValueOnce(rightProjection); + + mockMetricsRepo.get + .mockResolvedValueOnce({ durationMs: 1000 }) + .mockResolvedValueOnce({ durationMs: 1500 }); + + const result = await service.compareRuns('run-1', 'run-2'); + + expect(result.statusMatch).toBe(true); + expect(result.durationDeltaMs).toBe(500); + expect(result.confidenceDelta).toBeCloseTo(-0.1); + expect(result.participantsDiff.added).toEqual(['agent-3']); + expect(result.participantsDiff.removed).toEqual(['agent-2']); + expect(result.participantsDiff.common).toEqual(['agent-1']); + expect(result.signalsDiff.added).toEqual(['warning']); + expect(result.signalsDiff.removed).toEqual([]); + }); + + it('returns undefined deltas when metrics/projections are null', async () => { + mockRunRepo.findById.mockResolvedValue(fakeRun); + mockProjectionService.get.mockResolvedValue(null); + mockMetricsRepo.get.mockResolvedValue(null); + + const result = await service.compareRuns('run-1', 'run-2'); + + expect(result.durationDeltaMs).toBeUndefined(); + expect(result.confidenceDelta).toBeUndefined(); + expect(result.participantsDiff.common).toEqual([]); + }); + }); +}); diff --git a/src/insights/run-insights.service.ts b/src/insights/run-insights.service.ts new file mode 100644 index 0000000..c053543 --- /dev/null +++ b/src/insights/run-insights.service.ts @@ -0,0 +1,180 @@ +import { Injectable, NotFoundException } from '@nestjs/common'; +import { + CanonicalEvent, + MetricsSummary, + Run, + RunComparisonResult, + RunExportBundle, + RunStatus +} from '../contracts/control-plane'; +import { ProjectionService } from '../projection/projection.service'; +import { ArtifactRepository } from '../storage/artifact.repository'; +import { EventRepository } from '../storage/event.repository'; +import { MetricsRepository } from '../storage/metrics.repository'; +import { RunRepository } from '../storage/run.repository'; +import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; + +@Injectable() +export class RunInsightsService { + constructor( + private readonly runRepository: RunRepository, + private readonly runtimeSessionRepository: RuntimeSessionRepository, + private readonly projectionService: ProjectionService, + private readonly eventRepository: EventRepository, + private readonly metricsRepository: MetricsRepository, + private readonly artifactRepository: ArtifactRepository + ) {} + + async exportRun( + runId: string, + options: { includeCanonical?: boolean; includeRaw?: boolean; eventLimit?: number } + ): Promise { + const run = await this.runRepository.findById(runId); + if (!run) throw new NotFoundException(`run ${runId} not found`); + + const includeCanonical = options.includeCanonical !== false; + const includeRaw = options.includeRaw === true; + const eventLimit = options.eventLimit ?? 10000; + + const [session, projection, metrics, artifacts, canonicalEvents, rawEvents] = + await Promise.all([ + this.runtimeSessionRepository.findByRunId(runId), + this.projectionService.get(runId), + this.metricsRepository.get(runId), + this.artifactRepository.listByRunId(runId), + includeCanonical + ? this.eventRepository.listCanonicalByRun(runId, 0, eventLimit) + : Promise.resolve([]), + includeRaw + ? this.eventRepository.listRawByRun(runId, 0, eventLimit) + : Promise.resolve([]) + ]); + + return { + run: this.toRun(run), + session: session as Record | null, + projection, + metrics: metrics + ? ({ + runId: metrics.runId, + eventCount: metrics.eventCount, + messageCount: metrics.messageCount, + signalCount: metrics.signalCount, + proposalCount: metrics.proposalCount, + toolCallCount: metrics.toolCallCount, + decisionCount: metrics.decisionCount, + streamReconnectCount: metrics.streamReconnectCount, + firstEventAt: metrics.firstEventAt ?? undefined, + lastEventAt: metrics.lastEventAt ?? undefined, + durationMs: metrics.durationMs ?? undefined, + sessionState: (metrics.sessionState as MetricsSummary['sessionState']) ?? undefined + } satisfies MetricsSummary) + : null, + artifacts: artifacts.map((a) => ({ + id: a.id, + runId: a.runId, + kind: a.kind as Run['source'] extends undefined ? never : 'trace' | 'json' | 'report' | 'log' | 'bundle', + label: a.label, + uri: a.uri ?? undefined, + inline: a.inline ?? undefined, + createdAt: a.createdAt + })), + canonicalEvents: canonicalEvents as unknown as CanonicalEvent[], + rawEvents: rawEvents as unknown as Record[], + exportedAt: new Date().toISOString() + }; + } + + async compareRuns(leftRunId: string, rightRunId: string): Promise { + const [leftRun, rightRun] = await Promise.all([ + this.runRepository.findById(leftRunId), + this.runRepository.findById(rightRunId) + ]); + if (!leftRun) throw new NotFoundException(`run ${leftRunId} not found`); + if (!rightRun) throw new NotFoundException(`run ${rightRunId} not found`); + + const [leftProjection, rightProjection, leftMetrics, rightMetrics] = await Promise.all([ + this.projectionService.get(leftRunId), + this.projectionService.get(rightRunId), + this.metricsRepository.get(leftRunId), + this.metricsRepository.get(rightRunId) + ]); + + const leftParticipants = new Set( + leftProjection?.participants.map((p) => p.participantId) ?? [] + ); + const rightParticipants = new Set( + rightProjection?.participants.map((p) => p.participantId) ?? [] + ); + const commonParticipants = [...leftParticipants].filter((p) => rightParticipants.has(p)); + const addedParticipants = [...rightParticipants].filter((p) => !leftParticipants.has(p)); + const removedParticipants = [...leftParticipants].filter((p) => !rightParticipants.has(p)); + + const leftSignals = new Set( + leftProjection?.signals.signals.map((s) => s.name) ?? [] + ); + const rightSignals = new Set( + rightProjection?.signals.signals.map((s) => s.name) ?? [] + ); + const addedSignals = [...rightSignals].filter((s) => !leftSignals.has(s)); + const removedSignals = [...leftSignals].filter((s) => !rightSignals.has(s)); + + const leftDuration = leftMetrics?.durationMs ?? undefined; + const rightDuration = rightMetrics?.durationMs ?? undefined; + const durationDeltaMs = + leftDuration !== undefined && rightDuration !== undefined + ? rightDuration - leftDuration + : undefined; + + const leftConfidence = leftProjection?.decision.current?.confidence; + const rightConfidence = rightProjection?.decision.current?.confidence; + const confidenceDelta = + leftConfidence !== undefined && rightConfidence !== undefined + ? rightConfidence - leftConfidence + : undefined; + + return { + left: { + runId: leftRunId, + status: leftRun.status as RunStatus, + modeName: leftProjection?.run.modeName, + durationMs: leftDuration + }, + right: { + runId: rightRunId, + status: rightRun.status as RunStatus, + modeName: rightProjection?.run.modeName, + durationMs: rightDuration + }, + statusMatch: leftRun.status === rightRun.status, + durationDeltaMs, + confidenceDelta, + participantsDiff: { + added: addedParticipants, + removed: removedParticipants, + common: commonParticipants + }, + signalsDiff: { + added: addedSignals, + removed: removedSignals + } + }; + } + + private toRun(row: typeof import('../db/schema').runs.$inferSelect): Run { + return { + id: row.id, + status: row.status as RunStatus, + runtimeKind: row.runtimeKind, + runtimeVersion: row.runtimeVersion ?? undefined, + runtimeSessionId: row.runtimeSessionId ?? undefined, + traceId: row.traceId ?? undefined, + createdAt: row.createdAt, + startedAt: row.startedAt ?? undefined, + endedAt: row.endedAt ?? undefined, + tags: row.tags ?? undefined, + source: row.sourceKind ? { kind: row.sourceKind, ref: row.sourceRef ?? undefined } : undefined, + metadata: row.metadata ?? undefined + }; + } +} diff --git a/src/projection/projection.service.ts b/src/projection/projection.service.ts index 27f0b7a..c80785f 100644 --- a/src/projection/projection.service.ts +++ b/src/projection/projection.service.ts @@ -9,7 +9,7 @@ import { } from '../contracts/control-plane'; import { ProjectionRepository } from '../storage/projection.repository'; -export const PROJECTION_SCHEMA_VERSION = 2; +export const PROJECTION_SCHEMA_VERSION = 3; @Injectable() export class ProjectionService { @@ -26,7 +26,7 @@ export class ProjectionService { graph: row.graph as unknown as GraphProjection, decision: row.decision as unknown as RunStateProjection['decision'], signals: row.signals as unknown as RunStateProjection['signals'], - progress: (row as any).progress as unknown as ProgressProjection ?? { entries: [] }, + progress: row.progress as unknown as ProgressProjection ?? { entries: [] }, timeline: row.timeline as unknown as RunStateProjection['timeline'], trace: row.traceSummary as unknown as RunStateProjection['trace'] }; diff --git a/src/runs/run-recovery.service.spec.ts b/src/runs/run-recovery.service.spec.ts new file mode 100644 index 0000000..117b9e8 --- /dev/null +++ b/src/runs/run-recovery.service.spec.ts @@ -0,0 +1,183 @@ +import { RunRecoveryService } from './run-recovery.service'; +import { AppConfigService } from '../config/app-config.service'; +import { RunEventService } from '../events/run-event.service'; +import { RunRepository } from '../storage/run.repository'; +import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; +import { RunManagerService } from './run-manager.service'; +import { StreamConsumerService } from './stream-consumer.service'; + +describe('RunRecoveryService', () => { + let service: RunRecoveryService; + let mockConfig: Partial; + let mockRunRepo: { listActiveRuns: jest.Mock }; + let mockSessionRepo: { findByRunId: jest.Mock }; + let mockRunManager: { markRunning: jest.Mock; markFailed: jest.Mock }; + let mockStreamConsumer: { start: jest.Mock }; + let mockEventService: { emitControlPlaneEvents: jest.Mock }; + + beforeEach(() => { + mockConfig = { runRecoveryEnabled: true }; + mockRunRepo = { listActiveRuns: jest.fn().mockResolvedValue([]) }; + mockSessionRepo = { findByRunId: jest.fn().mockResolvedValue(null) }; + mockRunManager = { + markRunning: jest.fn().mockResolvedValue({}), + markFailed: jest.fn().mockResolvedValue({}) + }; + mockStreamConsumer = { start: jest.fn().mockResolvedValue(undefined) }; + mockEventService = { emitControlPlaneEvents: jest.fn().mockResolvedValue([]) }; + + service = new RunRecoveryService( + mockConfig as AppConfigService, + mockRunRepo as unknown as RunRepository, + mockSessionRepo as unknown as RuntimeSessionRepository, + mockRunManager as unknown as RunManagerService, + mockStreamConsumer as unknown as StreamConsumerService, + mockEventService as unknown as RunEventService + ); + }); + + it('skips recovery when disabled', async () => { + const disabledService = new RunRecoveryService( + { runRecoveryEnabled: false } as AppConfigService, + mockRunRepo as unknown as RunRepository, + mockSessionRepo as unknown as RuntimeSessionRepository, + mockRunManager as unknown as RunManagerService, + mockStreamConsumer as unknown as StreamConsumerService, + mockEventService as unknown as RunEventService + ); + await disabledService.onApplicationBootstrap(); + expect(mockRunRepo.listActiveRuns).not.toHaveBeenCalled(); + }); + + it('does nothing when no active runs', async () => { + await service.onApplicationBootstrap(); + expect(mockRunRepo.listActiveRuns).toHaveBeenCalled(); + expect(mockStreamConsumer.start).not.toHaveBeenCalled(); + }); + + it('recovers a running run by starting stream consumer', async () => { + const run = { + id: 'run-1', + status: 'running', + runtimeKind: 'rust', + runtimeSessionId: 'sess-1', + lastEventSeq: 42, + metadata: { + executionRequest: { + mode: 'live', + runtime: { kind: 'rust' }, + session: { + modeName: 'decision', + modeVersion: '1.0', + configurationVersion: 'v1', + ttlMs: 60000, + participants: [{ id: 'agent-1' }] + } + } + } + }; + mockRunRepo.listActiveRuns.mockResolvedValue([run]); + mockSessionRepo.findByRunId.mockResolvedValue({ + initiatorParticipantId: 'agent-1', + runtimeSessionId: 'sess-1' + }); + + await service.onApplicationBootstrap(); + + expect(mockEventService.emitControlPlaneEvents).toHaveBeenCalledWith( + 'run-1', + expect.arrayContaining([ + expect.objectContaining({ + type: 'session.stream.opened', + data: expect.objectContaining({ status: 'recovered' }) + }) + ]) + ); + expect(mockStreamConsumer.start).toHaveBeenCalledWith( + expect.objectContaining({ + runId: 'run-1', + runtimeSessionId: 'sess-1', + subscriberId: 'agent-1', + resumeFromSeq: 42 + }) + ); + }); + + it('promotes binding_session to running before recovery', async () => { + const run = { + id: 'run-2', + status: 'binding_session', + runtimeKind: 'rust', + runtimeSessionId: 'sess-2', + lastEventSeq: 10, + metadata: { + executionRequest: { + mode: 'live', + runtime: { kind: 'rust' }, + session: { + modeName: 'decision', + modeVersion: '1.0', + configurationVersion: 'v1', + ttlMs: 60000, + participants: [{ id: 'agent-1' }] + } + } + } + }; + mockRunRepo.listActiveRuns.mockResolvedValue([run]); + + await service.onApplicationBootstrap(); + + expect(mockRunManager.markRunning).toHaveBeenCalledWith('run-2', 'sess-2'); + }); + + it('marks run as failed when recovery fails', async () => { + const run = { + id: 'run-3', + status: 'running', + runtimeKind: 'rust', + runtimeSessionId: 'sess-3', + lastEventSeq: 0, + metadata: {} // missing executionRequest + }; + mockRunRepo.listActiveRuns.mockResolvedValue([run]); + + await service.onApplicationBootstrap(); + + expect(mockRunManager.markFailed).toHaveBeenCalledWith( + 'run-3', + expect.any(Error) + ); + }); + + it('does not crash if markFailed also fails', async () => { + const run = { + id: 'run-4', + status: 'running', + runtimeKind: 'rust', + runtimeSessionId: null, + lastEventSeq: 0, + metadata: { + executionRequest: { + mode: 'live', + runtime: { kind: 'rust' }, + session: { + modeName: 'decision', + modeVersion: '1.0', + configurationVersion: 'v1', + ttlMs: 60000, + participants: [{ id: 'agent-1' }] + } + } + } + }; + mockRunRepo.listActiveRuns.mockResolvedValue([run]); + mockSessionRepo.findByRunId.mockResolvedValue(null); + mockRunManager.markFailed.mockRejectedValue(new Error('db down')); + + // Should not throw + await service.onApplicationBootstrap(); + + expect(mockRunManager.markFailed).toHaveBeenCalled(); + }); +}); diff --git a/src/runs/run-recovery.service.ts b/src/runs/run-recovery.service.ts new file mode 100644 index 0000000..48ecc71 --- /dev/null +++ b/src/runs/run-recovery.service.ts @@ -0,0 +1,108 @@ +import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { ExecutionRequest } from '../contracts/control-plane'; +import { AppConfigService } from '../config/app-config.service'; +import { RunEventService } from '../events/run-event.service'; +import { RunRepository } from '../storage/run.repository'; +import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; +import { RunManagerService } from './run-manager.service'; +import { StreamConsumerService } from './stream-consumer.service'; + +@Injectable() +export class RunRecoveryService implements OnApplicationBootstrap { + private readonly logger = new Logger(RunRecoveryService.name); + + constructor( + private readonly config: AppConfigService, + private readonly runRepository: RunRepository, + private readonly runtimeSessionRepository: RuntimeSessionRepository, + private readonly runManager: RunManagerService, + private readonly streamConsumer: StreamConsumerService, + private readonly eventService: RunEventService + ) {} + + async onApplicationBootstrap(): Promise { + if (!this.config.runRecoveryEnabled) { + this.logger.log('run recovery disabled'); + return; + } + await this.recoverActiveRuns(); + } + + async recoverActiveRuns(): Promise { + const activeRuns = await this.runRepository.listActiveRuns(); + if (activeRuns.length === 0) { + this.logger.log('no active runs to recover'); + return; + } + this.logger.log(`recovering ${activeRuns.length} active run(s)`); + + for (const run of activeRuns) { + try { + await this.recoverRun(run); + } catch (error) { + this.logger.error( + `failed to recover run ${run.id}: ${error instanceof Error ? error.message : String(error)}` + ); + try { + await this.runManager.markFailed(run.id, error); + } catch (markError) { + this.logger.error( + `failed to mark run ${run.id} as failed: ${markError instanceof Error ? markError.message : String(markError)}` + ); + } + } + } + } + + private async recoverRun(run: { + id: string; + status: string; + runtimeKind: string; + runtimeSessionId: string | null; + lastEventSeq: number; + metadata: Record; + }): Promise { + const executionRequest = run.metadata?.executionRequest as ExecutionRequest | undefined; + if (!executionRequest) { + throw new Error('missing executionRequest in run metadata'); + } + + const session = await this.runtimeSessionRepository.findByRunId(run.id); + const runtimeSessionId = run.runtimeSessionId ?? session?.runtimeSessionId; + if (!runtimeSessionId) { + throw new Error('no runtime session ID available for recovery'); + } + + const subscriberId = + session?.initiatorParticipantId ?? + executionRequest.session.initiatorParticipantId ?? + executionRequest.session.participants[0]?.id ?? + 'control-plane'; + + // Promote binding_session → running if needed + if (run.status === 'binding_session') { + await this.runManager.markRunning(run.id, runtimeSessionId); + } + + await this.eventService.emitControlPlaneEvents(run.id, [ + { + ts: new Date().toISOString(), + type: 'session.stream.opened', + source: { kind: 'control-plane', name: 'run-recovery' }, + subject: { kind: 'session', id: runtimeSessionId }, + data: { status: 'recovered', detail: 'stream resumed after restart' } + } + ]); + + await this.streamConsumer.start({ + runId: run.id, + execution: executionRequest, + runtimeKind: run.runtimeKind, + runtimeSessionId, + subscriberId, + resumeFromSeq: run.lastEventSeq + }); + + this.logger.log(`recovered run ${run.id} from seq ${run.lastEventSeq}`); + } +} diff --git a/src/runs/stream-consumer.service.ts b/src/runs/stream-consumer.service.ts index 1f92713..4e6fa30 100644 --- a/src/runs/stream-consumer.service.ts +++ b/src/runs/stream-consumer.service.ts @@ -124,6 +124,7 @@ export class StreamConsumerService implements OnModuleDestroy { if (marker.aborted) return; await this.handleRawEvent(params.runId, raw, context, params.runtimeSessionId, marker); if (marker.finalized) return; + retries = 0; } const snapshot = await provider.getSession({ @@ -221,15 +222,25 @@ export class StreamConsumerService implements OnModuleDestroy { timeoutMs: number ): AsyncIterable { const iterator = iterable[Symbol.asyncIterator](); - while (true) { - const result = await Promise.race([ - iterator.next(), - new Promise<{ done: true; value: undefined }>((resolve) => - setTimeout(() => resolve({ done: true, value: undefined }), timeoutMs) - ) - ]); - if (result.done) return; - yield result.value; + try { + while (true) { + let timer: ReturnType | undefined; + try { + const result = await Promise.race([ + iterator.next(), + new Promise<{ done: true; value: undefined }>((resolve) => { + timer = setTimeout(() => resolve({ done: true, value: undefined }), timeoutMs); + timer.unref(); + }) + ]); + if (result.done) return; + yield result.value; + } finally { + if (timer !== undefined) clearTimeout(timer); + } + } + } finally { + await iterator.return?.(); } } diff --git a/src/storage/event.repository.spec.ts b/src/storage/event.repository.spec.ts index 855dfe3..5c1a84a 100644 --- a/src/storage/event.repository.spec.ts +++ b/src/storage/event.repository.spec.ts @@ -176,6 +176,50 @@ describe('EventRepository', () => { }); }); + // ------ listCanonicalRange ------ + describe('listCanonicalRange', () => { + it('queries with runId, afterSeq, toSeq and limit', async () => { + const fakeEvents = [{ id: 'evt-1', seq: 3 }]; + mockDb._select.limit.mockResolvedValue(fakeEvents); + + const result = await repo.listCanonicalRange('run-1', 2, 10, 100); + + expect(mockDb.select).toHaveBeenCalled(); + expect(mockDb._select.from).toHaveBeenCalled(); + expect(mockDb._select.where).toHaveBeenCalled(); + expect(result).toEqual(fakeEvents); + }); + + it('uses default limit of 500', async () => { + mockDb._select.limit.mockResolvedValue([]); + + await repo.listCanonicalRange('run-1', 0, 100); + + expect(mockDb._select.limit).toHaveBeenCalledWith(500); + }); + }); + + // ------ listRawByRun ------ + describe('listRawByRun', () => { + it('queries raw events by runId with afterSeq', async () => { + const fakeEvents = [{ id: 'raw-1', seq: 1 }]; + mockDb._select.limit.mockResolvedValue(fakeEvents); + + const result = await repo.listRawByRun('run-1', 0, 500); + + expect(mockDb.select).toHaveBeenCalled(); + expect(result).toEqual(fakeEvents); + }); + + it('uses default afterSeq of 0 and limit of 1000', async () => { + mockDb._select.limit.mockResolvedValue([]); + + await repo.listRawByRun('run-1'); + + expect(mockDb._select.limit).toHaveBeenCalledWith(1000); + }); + }); + // ------ listCanonicalUpTo ------ describe('listCanonicalUpTo', () => { it('queries without upper bound when seq is undefined', async () => { diff --git a/src/storage/event.repository.ts b/src/storage/event.repository.ts index 447c722..6acf23f 100644 --- a/src/storage/event.repository.ts +++ b/src/storage/event.repository.ts @@ -59,6 +59,30 @@ export class EventRepository { .limit(limit); } + async listCanonicalRange(runId: string, afterSeq: number, toSeq: number, limit = 500) { + return this.database.db + .select() + .from(runEventsCanonical) + .where( + and( + eq(runEventsCanonical.runId, runId), + gt(runEventsCanonical.seq, afterSeq), + lte(runEventsCanonical.seq, toSeq) + ) + ) + .orderBy(asc(runEventsCanonical.seq)) + .limit(limit); + } + + async listRawByRun(runId: string, afterSeq = 0, limit = 1000) { + return this.database.db + .select() + .from(runEventsRaw) + .where(and(eq(runEventsRaw.runId, runId), gt(runEventsRaw.seq, afterSeq))) + .orderBy(asc(runEventsRaw.seq)) + .limit(limit); + } + async listCanonicalUpTo(runId: string, seq?: number) { const where = seq === undefined ? eq(runEventsCanonical.runId, runId) diff --git a/src/storage/projection.repository.ts b/src/storage/projection.repository.ts index 3166e55..37e8de9 100644 --- a/src/storage/projection.repository.ts +++ b/src/storage/projection.repository.ts @@ -30,6 +30,7 @@ export class ProjectionRepository { signals: projection.signals as unknown as Record, timeline: projection.timeline as unknown as Record, traceSummary: projection.trace as unknown as Record, + progress: projection.progress as unknown as Record, updatedAt: new Date().toISOString() }) .onConflictDoUpdate({ @@ -43,6 +44,7 @@ export class ProjectionRepository { signals: projection.signals as unknown as Record, timeline: projection.timeline as unknown as Record, traceSummary: projection.trace as unknown as Record, + progress: projection.progress as unknown as Record, updatedAt: new Date().toISOString() } });