Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ STREAM_IDLE_TIMEOUT_MS=120000
STREAM_MAX_RETRIES=5
STREAM_BACKOFF_BASE_MS=250
STREAM_BACKOFF_MAX_MS=30000
STREAM_SSE_HEARTBEAT_MS=15000

# ── Replay ───────────────────────────────────────
REPLAY_MAX_DELAY_MS=2000
REPLAY_BATCH_SIZE=500

# ── Run Recovery ────────────────────────────────
RUN_RECOVERY_ENABLED=true

# ── Logging ──────────────────────────────────────
LOG_LEVEL=info

Expand Down
3 changes: 3 additions & 0 deletions drizzle/0003_v2_progress_and_export.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE "run_projections" ADD COLUMN IF NOT EXISTS "progress" jsonb NOT NULL DEFAULT '{"entries":[]}';
CREATE INDEX IF NOT EXISTS run_events_raw_run_ts_idx ON run_events_raw(run_id, ts);
CREATE INDEX IF NOT EXISTS run_events_raw_run_seq_idx ON run_events_raw(run_id, seq);
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "macp-control-plane",
"version": "0.1.0",
"version": "0.2.0",
"private": true,
"description": "Scenario-agnostic control plane for the MACP runtime, built with NestJS.",
"license": "MIT",
Expand Down
9 changes: 7 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ConfigModule } from './config/config.module';
import { HealthController } from './controllers/health.controller';
import { MetricsController } from './controllers/metrics.controller';
import { ObservabilityController } from './controllers/observability.controller';
import { RunInsightsController } from './controllers/run-insights.controller';
import { RunsController } from './controllers/runs.controller';
import { RuntimeController } from './controllers/runtime.controller';
import { DatabaseModule } from './db/database.module';
Expand All @@ -33,8 +34,10 @@ import { RunRepository } from './storage/run.repository';
import { RuntimeSessionRepository } from './storage/runtime-session.repository';
import { InstrumentationService } from './telemetry/instrumentation.service';
import { TraceService } from './telemetry/trace.service';
import { RunInsightsService } from './insights/run-insights.service';
import { RunExecutorService } from './runs/run-executor.service';
import { RunManagerService } from './runs/run-manager.service';
import { RunRecoveryService } from './runs/run-recovery.service';
import { StreamConsumerService } from './runs/stream-consumer.service';

@Module({
Expand All @@ -44,7 +47,7 @@ import { StreamConsumerService } from './runs/stream-consumer.service';
AuthModule,
ThrottlerModule.forRoot([{ ttl: 60000, limit: 100 }])
],
controllers: [RunsController, RuntimeController, ObservabilityController, HealthController, MetricsController],
controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController],
providers: [
{ provide: APP_GUARD, useClass: AuthGuard },
{ provide: APP_GUARD, useClass: ThrottleByUserGuard },
Expand All @@ -70,7 +73,9 @@ import { StreamConsumerService } from './runs/stream-consumer.service';
ReplayService,
RunManagerService,
StreamConsumerService,
RunExecutorService
RunExecutorService,
RunRecoveryService,
RunInsightsService
]
})
export class AppModule implements NestModule {
Expand Down
2 changes: 2 additions & 0 deletions src/config/app-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ export class AppConfigService implements OnModuleInit {
readonly streamMaxRetries = readNumber('STREAM_MAX_RETRIES', 5);
readonly streamBackoffBaseMs = readNumber('STREAM_BACKOFF_BASE_MS', 250);
readonly streamBackoffMaxMs = readNumber('STREAM_BACKOFF_MAX_MS', 30000);
readonly streamSseHeartbeatMs = readNumber('STREAM_SSE_HEARTBEAT_MS', 15000);
readonly replayMaxDelayMs = readNumber('REPLAY_MAX_DELAY_MS', 2000);
readonly replayBatchSize = readNumber('REPLAY_BATCH_SIZE', 500);
readonly runRecoveryEnabled = readBoolean('RUN_RECOVERY_ENABLED', true);

readonly dbPoolMax = readNumber('DB_POOL_MAX', 20);
readonly dbPoolIdleTimeout = readNumber('DB_POOL_IDLE_TIMEOUT', 30000);
Expand Down
33 changes: 33 additions & 0 deletions src/contracts/control-plane.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,39 @@ export interface ReplayRequest {
toSeq?: number;
}

export interface RunExportBundle {
run: Run;
session: Record<string, unknown> | null;
projection: RunStateProjection | null;
metrics: MetricsSummary | null;
artifacts: Artifact[];
canonicalEvents: CanonicalEvent[];
rawEvents: Record<string, unknown>[];
exportedAt: string;
}

export interface RunComparisonRequest {
leftRunId: string;
rightRunId: string;
}

export interface RunComparisonResult {
left: { runId: string; status: RunStatus; modeName?: string; durationMs?: number };
right: { runId: string; status: RunStatus; modeName?: string; durationMs?: number };
statusMatch: boolean;
durationDeltaMs?: number;
confidenceDelta?: number;
participantsDiff: {
added: string[];
removed: string[];
common: string[];
};
signalsDiff: {
added: string[];
removed: string[];
};
}

export interface MetricsSummary {
runId: string;
eventCount: number;
Expand Down
62 changes: 62 additions & 0 deletions src/controllers/run-insights.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { RunInsightsController } from './run-insights.controller';
import { RunInsightsService } from '../insights/run-insights.service';

describe('RunInsightsController', () => {
let controller: RunInsightsController;
let mockInsightsService: {
exportRun: jest.Mock;
compareRuns: jest.Mock;
};

beforeEach(() => {
mockInsightsService = {
exportRun: jest.fn(),
compareRuns: jest.fn()
};
controller = new RunInsightsController(
mockInsightsService as unknown as RunInsightsService
);
});

describe('exportRun', () => {
it('delegates to insightsService.exportRun with options', async () => {
const bundle = { run: { id: 'run-1' }, exportedAt: '2026-01-01T00:00:00Z' };
mockInsightsService.exportRun.mockResolvedValue(bundle);

const query = { includeCanonical: true, includeRaw: false, eventLimit: 500 };
const result = await controller.exportRun('run-1', query as any);

Check warning on line 27 in src/controllers/run-insights.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', {
includeCanonical: true,
includeRaw: false,
eventLimit: 500
});
expect(result).toEqual(bundle);
});

it('passes undefined options when query is empty', async () => {
mockInsightsService.exportRun.mockResolvedValue({});

await controller.exportRun('run-1', {} as any);

Check warning on line 40 in src/controllers/run-insights.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', {
includeCanonical: undefined,
includeRaw: undefined,
eventLimit: undefined
});
});
});

describe('compareRuns', () => {
it('delegates to insightsService.compareRuns', async () => {
const comparison = { statusMatch: true, left: {}, right: {} };
mockInsightsService.compareRuns.mockResolvedValue(comparison);

const body = { leftRunId: 'run-1', rightRunId: 'run-2' };
const result = await controller.compareRuns(body as any);

Check warning on line 56 in src/controllers/run-insights.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

expect(mockInsightsService.compareRuns).toHaveBeenCalledWith('run-1', 'run-2');
expect(result).toEqual(comparison);
});
});
});
45 changes: 45 additions & 0 deletions src/controllers/run-insights.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import {
Body,
Controller,
Get,
Param,
ParseUUIDPipe,
Post,
Query,
ValidationPipe
} from '@nestjs/common';
import { ApiBody, ApiOkResponse, ApiOperation, ApiTags } from '@nestjs/swagger';
import { CompareRunsDto } from '../dto/compare-runs.dto';
import { ExportRunQueryDto } from '../dto/export-run-query.dto';
import { RunBundleExportDto, RunComparisonResultDto } from '../dto/run-responses.dto';
import { RunInsightsService } from '../insights/run-insights.service';

@ApiTags('runs')
@Controller('runs')
export class RunInsightsController {
constructor(private readonly insightsService: RunInsightsService) {}

@Get(':id/export')
@ApiOperation({ summary: 'Export a full run bundle (run, session, projection, events, artifacts).' })
@ApiOkResponse({ type: RunBundleExportDto })
async exportRun(
@Param('id', new ParseUUIDPipe()) id: string,
@Query(new ValidationPipe({ transform: true, whitelist: true })) query: ExportRunQueryDto
) {
return this.insightsService.exportRun(id, {
includeCanonical: query.includeCanonical,
includeRaw: query.includeRaw,
eventLimit: query.eventLimit
});
}

@Post('compare')
@ApiOperation({ summary: 'Compare two runs side-by-side.' })
@ApiBody({ type: CompareRunsDto })
@ApiOkResponse({ type: RunComparisonResultDto })
async compareRuns(
@Body(new ValidationPipe({ transform: true, whitelist: true })) body: CompareRunsDto
) {
return this.insightsService.compareRuns(body.leftRunId, body.rightRunId);
}
}
6 changes: 6 additions & 0 deletions src/controllers/runs.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { RunManagerService } from '../runs/run-manager.service';
import { EventRepository } from '../storage/event.repository';
import { ReplayService } from '../replay/replay.service';
import { StreamHubService } from '../events/stream-hub.service';
import { AppConfigService } from '../config/app-config.service';

describe('RunsController', () => {
let controller: RunsController;
Expand All @@ -23,6 +24,7 @@ describe('RunsController', () => {
};
let mockReplayService: Partial<ReplayService>;
let mockStreamHub: Partial<StreamHubService>;
let mockConfig: Partial<AppConfigService>;

beforeEach(() => {
mockRunExecutor = {
Expand All @@ -41,13 +43,17 @@ describe('RunsController', () => {
};
mockReplayService = {};
mockStreamHub = {};
mockConfig = {
streamSseHeartbeatMs: 15000,
};

controller = new RunsController(
mockRunExecutor as unknown as RunExecutorService,
mockRunManager as unknown as RunManagerService,
mockEventRepository as unknown as EventRepository,
mockReplayService as unknown as ReplayService,
mockStreamHub as unknown as StreamHubService,
mockConfig as unknown as AppConfigService,
);
});

Expand Down
Loading
Loading