Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions drizzle/0004_session_ttl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add expires_at column to runtime_sessions for TTL tracking
ALTER TABLE "runtime_sessions" ADD COLUMN "expires_at" TIMESTAMP WITH TIME ZONE;
12 changes: 12 additions & 0 deletions drizzle/0005_webhooks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Create webhooks table for webhook notification subscriptions
CREATE TABLE IF NOT EXISTS "webhooks" (
"id" UUID PRIMARY KEY,
"url" TEXT NOT NULL,
"events" JSONB NOT NULL DEFAULT '[]',
"secret" VARCHAR(255) NOT NULL,
"active" INTEGER NOT NULL DEFAULT 1,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now()
);

CREATE INDEX IF NOT EXISTS "webhooks_active_idx" ON "webhooks" ("active");
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "macp-control-plane",
"version": "0.2.0",
"version": "0.3.0",
"private": true,
"description": "Scenario-agnostic control plane for the MACP runtime, built with NestJS.",
"license": "MIT",
Expand All @@ -17,7 +17,8 @@
"test:cov": "jest --coverage",
"drizzle:generate": "drizzle-kit generate",
"drizzle:migrate": "drizzle-kit migrate",
"drizzle:studio": "drizzle-kit studio"
"drizzle:studio": "drizzle-kit studio",
"proto:sync": "bash scripts/proto-sync.sh"
},
"dependencies": {
"@grpc/grpc-js": "^1.14.0",
Expand Down
21 changes: 21 additions & 0 deletions scripts/proto-sync.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
# Sync proto files from the runtime's authoritative protos to the control plane.
# Usage: npm run proto:sync
#
# Assumes the runtime repo is at ../runtime relative to the control plane root.

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CP_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
RUNTIME_ROOT="${RUNTIME_PROTO_DIR:-$CP_ROOT/../runtime}"

if [ ! -d "$RUNTIME_ROOT/proto" ]; then
echo "ERROR: Runtime proto directory not found at $RUNTIME_ROOT/proto"
echo "Set RUNTIME_PROTO_DIR to the runtime repo root, or place it at ../runtime"
exit 1
fi

echo "Syncing protos from $RUNTIME_ROOT/proto → $CP_ROOT/proto"
rsync -av --delete "$RUNTIME_ROOT/proto/" "$CP_ROOT/proto/"
echo "Proto sync complete."
11 changes: 9 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { AuthGuard } from './auth/auth.guard';
import { AuthModule } from './auth/auth.module';
import { ThrottleByUserGuard } from './auth/throttle-by-user.guard';
import { ConfigModule } from './config/config.module';
import { AdminController } from './controllers/admin.controller';
import { AuditController } from './controllers/audit.controller';
import { HealthController } from './controllers/health.controller';
import { MetricsController } from './controllers/metrics.controller';
import { ObservabilityController } from './controllers/observability.controller';
Expand Down Expand Up @@ -39,6 +41,9 @@ import { RunExecutorService } from './runs/run-executor.service';
import { RunManagerService } from './runs/run-manager.service';
import { RunRecoveryService } from './runs/run-recovery.service';
import { StreamConsumerService } from './runs/stream-consumer.service';
import { WebhookController } from './controllers/webhook.controller';
import { WebhookRepository } from './webhooks/webhook.repository';
import { WebhookService } from './webhooks/webhook.service';

@Module({
imports: [
Expand All @@ -47,7 +52,7 @@ import { StreamConsumerService } from './runs/stream-consumer.service';
AuthModule,
ThrottlerModule.forRoot([{ ttl: 60000, limit: 100 }])
],
controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController],
controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController, AdminController, AuditController, WebhookController],
providers: [
{ provide: APP_GUARD, useClass: AuthGuard },
{ provide: APP_GUARD, useClass: ThrottleByUserGuard },
Expand Down Expand Up @@ -75,7 +80,9 @@ import { StreamConsumerService } from './runs/stream-consumer.service';
StreamConsumerService,
RunExecutorService,
RunRecoveryService,
RunInsightsService
RunInsightsService,
WebhookRepository,
WebhookService
]
})
export class AppModule implements NestModule {
Expand Down
18 changes: 15 additions & 3 deletions src/artifacts/inline-artifact-storage.provider.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { Injectable } from '@nestjs/common';
import { ArtifactRepository } from '../storage/artifact.repository';
import { ArtifactStorageProvider } from './artifact-storage.interface';

@Injectable()
export class InlineArtifactStorageProvider implements ArtifactStorageProvider {
readonly kind = 'inline';

constructor(private readonly artifactRepository: ArtifactRepository) {}

async store(params: {
runId: string;
artifactId: string;
Expand All @@ -17,8 +20,17 @@ export class InlineArtifactStorageProvider implements ArtifactStorageProvider {
return { uri: `inline://${params.runId}/${params.artifactId}` };
}

async retrieve(_uri: string): Promise<{ data: Buffer; contentType?: string } | null> {
// Inline artifacts are read directly from the DB, not through this provider.
return null;
async retrieve(uri: string): Promise<{ data: Buffer; contentType?: string } | null> {
// Parse inline URI: inline://{runId}/{artifactId}
const match = uri.match(/^inline:\/\/([^/]+)\/([^/]+)$/);
if (!match) return null;

const artifact = await this.artifactRepository.findById(match[2]);
if (!artifact?.inline) return null;

return {
data: Buffer.from(JSON.stringify(artifact.inline), 'utf8'),
contentType: 'application/json'
};
}
}
38 changes: 38 additions & 0 deletions src/audit/audit.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Injectable, Logger } from '@nestjs/common';
import { and, desc, eq, gt, lt, sql, SQL } from 'drizzle-orm';
import { randomUUID } from 'node:crypto';
import { DatabaseService } from '../db/database.service';
import { auditLog } from '../db/schema';
Expand Down Expand Up @@ -37,4 +38,41 @@ export class AuditService {
this.logger.error(`audit record failed: ${error instanceof Error ? error.message : String(error)}`);
}
}

async list(filters: {
actor?: string;
action?: string;
resource?: string;
resourceId?: string;
createdAfter?: string;
createdBefore?: string;
limit?: number;
offset?: number;
}): Promise<{ data: (typeof auditLog.$inferSelect)[]; total: number }> {
const conditions: SQL[] = [];
if (filters.actor) conditions.push(eq(auditLog.actor, filters.actor));
if (filters.action) conditions.push(eq(auditLog.action, filters.action));
if (filters.resource) conditions.push(eq(auditLog.resource, filters.resource));
if (filters.resourceId) conditions.push(eq(auditLog.resourceId, filters.resourceId));
if (filters.createdAfter) conditions.push(gt(auditLog.createdAt, filters.createdAfter));
if (filters.createdBefore) conditions.push(lt(auditLog.createdAt, filters.createdBefore));

const where = conditions.length > 0 ? and(...conditions) : undefined;

const [data, countResult] = await Promise.all([
this.database.db
.select()
.from(auditLog)
.where(where)
.orderBy(desc(auditLog.createdAt))
.limit(filters.limit ?? 50)
.offset(filters.offset ?? 0),
this.database.db
.select({ count: sql<number>`count(*)::int` })
.from(auditLog)
.where(where)
]);

return { data, total: countResult[0]?.count ?? 0 };
}
}
36 changes: 36 additions & 0 deletions src/contracts/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,48 @@ export interface RuntimeCallOptions {
deadline?: Date;
}

/** Request to open a unified bidirectional session stream */
export interface RuntimeOpenSessionRequest {
runId: string;
execution: ExecutionRequest;
}

/** Handle to an open bidirectional StreamSession */
export interface RuntimeSessionHandle {
/** Send an envelope through the open stream */
send(envelope: RuntimeEnvelope): void;
/** Async iterable of raw events from the stream */
events: AsyncIterable<RawRuntimeEvent>;
/** Close the write side (after all kickoff messages sent) */
closeWrite(): void;
/** Abort the stream immediately */
abort(): void;
/** The ack derived from the SessionStart echo (resolved after first response) */
sessionAck: Promise<RuntimeStartSessionResult>;
}

/** Stored runtime capabilities from Initialize response */
export interface RuntimeCapabilities {
sessions?: { stream?: boolean };
cancellation?: { cancelSession?: boolean };
progress?: { progress?: boolean };
manifest?: { getManifest?: boolean };
modeRegistry?: { listModes?: boolean; listChanged?: boolean };
roots?: { listRoots?: boolean; listChanged?: boolean };
}

export interface RuntimeProvider {
readonly kind: string;

initialize(req: RuntimeInitializeRequest, opts?: RuntimeCallOptions): Promise<RuntimeInitializeResult>;

/** Open a unified bidirectional session — replaces startSession() + streamSession() */
openSession(req: RuntimeOpenSessionRequest): RuntimeSessionHandle;

/** @deprecated Use openSession() for new session creation. Kept for backward compat. */
startSession(req: RuntimeStartSessionRequest, opts?: RuntimeCallOptions): Promise<RuntimeStartSessionResult>;
send(req: RuntimeSendRequest): Promise<RuntimeSendResult>;
/** @deprecated Use openSession().events for streaming. Kept for reconnection fallback. */
streamSession(req: RuntimeStreamSessionRequest): AsyncIterable<RawRuntimeEvent>;
getSession(req: RuntimeGetSessionRequest): Promise<RuntimeSessionSnapshot>;
cancelSession(req: RuntimeCancelSessionRequest): Promise<RuntimeCancelResult>;
Expand Down
17 changes: 17 additions & 0 deletions src/controllers/admin.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Controller, Post, HttpCode } from '@nestjs/common';
import { ApiOperation, ApiTags } from '@nestjs/swagger';
import { RustRuntimeProvider } from '../runtime/rust-runtime.provider';

@ApiTags('admin')
@Controller('admin')
export class AdminController {
constructor(private readonly rustRuntime: RustRuntimeProvider) {}

@Post('circuit-breaker/reset')
@HttpCode(200)
@ApiOperation({ summary: 'Manually reset the circuit breaker to CLOSED state.' })
resetCircuitBreaker() {
this.rustRuntime.resetCircuitBreaker();
return { status: 'ok', state: 'CLOSED' };
}
}
27 changes: 27 additions & 0 deletions src/controllers/audit.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Controller, Get, Query, ValidationPipe } from '@nestjs/common';
import { ApiOperation, ApiTags } from '@nestjs/swagger';
import { AuditService } from '../audit/audit.service';
import { ListAuditQueryDto } from '../dto/list-audit-query.dto';

@ApiTags('audit')
@Controller('audit')
export class AuditController {
constructor(private readonly auditService: AuditService) {}

@Get()
@ApiOperation({ summary: 'List audit log entries with optional filtering.' })
async listAuditLogs(
@Query(new ValidationPipe({ transform: true, whitelist: true })) query: ListAuditQueryDto
) {
return this.auditService.list({
actor: query.actor,
action: query.action,
resource: query.resource,
resourceId: query.resourceId,
createdAfter: query.createdAfter,
createdBefore: query.createdBefore,
limit: query.limit ?? 50,
offset: query.offset ?? 0
});
}
}
60 changes: 59 additions & 1 deletion src/controllers/run-insights.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
import { RunInsightsController } from './run-insights.controller';
import { RunInsightsService } from '../insights/run-insights.service';
import { RunExecutorService } from '../runs/run-executor.service';

describe('RunInsightsController', () => {
let controller: RunInsightsController;
let mockInsightsService: {
exportRun: jest.Mock;
exportRunJsonl: jest.Mock;
compareRuns: jest.Mock;
};
let mockRunExecutor: {
cancel: jest.Mock;
};

beforeEach(() => {
mockInsightsService = {
exportRun: jest.fn(),
exportRunJsonl: jest.fn(),
compareRuns: jest.fn()
};
mockRunExecutor = {
cancel: jest.fn()
};
controller = new RunInsightsController(
mockInsightsService as unknown as RunInsightsService
mockInsightsService as unknown as RunInsightsService,
mockRunExecutor as unknown as RunExecutorService
);
});

Expand All @@ -24,7 +34,7 @@
mockInsightsService.exportRun.mockResolvedValue(bundle);

const query = { includeCanonical: true, includeRaw: false, eventLimit: 500 };
const result = await controller.exportRun('run-1', query as any);

Check warning on line 37 in src/controllers/run-insights.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', {
includeCanonical: true,
Expand All @@ -37,7 +47,7 @@
it('passes undefined options when query is empty', async () => {
mockInsightsService.exportRun.mockResolvedValue({});

await controller.exportRun('run-1', {} as any);

Check warning on line 50 in src/controllers/run-insights.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', {
includeCanonical: undefined,
Expand All @@ -45,6 +55,21 @@
eventLimit: undefined
});
});

it('delegates to exportRunJsonl when format is jsonl', async () => {
const jsonl = '{"type":"header"}\n';
mockInsightsService.exportRunJsonl.mockResolvedValue(jsonl);

const query = { includeCanonical: true, includeRaw: false, eventLimit: 500, format: 'jsonl' as const };
const result = await controller.exportRun('run-1', query as any);

Check warning on line 64 in src/controllers/run-insights.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

expect(mockInsightsService.exportRunJsonl).toHaveBeenCalledWith('run-1', {
includeCanonical: true,
includeRaw: false,
eventLimit: 500
});
expect(result).toBe(jsonl);
});
});

describe('compareRuns', () => {
Expand All @@ -59,4 +84,37 @@
expect(result).toEqual(comparison);
});
});

describe('batchCancel', () => {
it('cancels multiple runs and returns results', async () => {
mockRunExecutor.cancel
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new Error('not found'));

const result = await controller.batchCancel({ runIds: ['run-1', 'run-2'] });

expect(result).toEqual([
{ runId: 'run-1', status: 'cancelled', error: undefined },
{ runId: 'run-2', status: 'failed', error: 'not found' }
]);
expect(mockRunExecutor.cancel).toHaveBeenCalledWith('run-1', 'batch cancel');
expect(mockRunExecutor.cancel).toHaveBeenCalledWith('run-2', 'batch cancel');
});
});

describe('batchExport', () => {
it('exports multiple runs and returns bundles', async () => {
const bundle1 = { run: { id: 'run-1' }, exportedAt: '2026-01-01T00:00:00Z' };
const bundle2 = { run: { id: 'run-2' }, exportedAt: '2026-01-01T00:00:00Z' };
mockInsightsService.exportRun
.mockResolvedValueOnce(bundle1)
.mockResolvedValueOnce(bundle2);

const result = await controller.batchExport({ runIds: ['run-1', 'run-2'] });

expect(result).toEqual([bundle1, bundle2]);
expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', { includeCanonical: true, includeRaw: false });
expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-2', { includeCanonical: true, includeRaw: false });
});
});
});
Loading
Loading