Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:
fail=0

echo "::group::throw new Error outside allowed files"
if grep -rn "throw new Error" src --include='*.ts' | grep -vE '(app-config|migrate|\.spec\.ts|circuit-breaker\.ts|proto-registry\.service\.ts|grpc-helpers\.ts|run\.repository\.ts|webhook\.service\.ts|run-recovery\.service\.ts)'; then
if grep -rn "throw new Error" src --include='*.ts' | grep -vE '(app-config|migrate|\.spec\.ts|circuit-breaker\.ts|proto-registry\.service\.ts|grpc-helpers\.ts|run\.repository\.ts|webhook\.service\.ts|run-recovery\.service\.ts|runtime-jwt-minter\.service\.ts)'; then
echo "::error::throw new Error found outside allowed files. Use AppException or a Nest built-in."
fail=1
fi
Expand Down
2 changes: 2 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { ProjectionService } from './projection/projection.service';
import { ReplayService } from './replay/replay.service';
import { ProtoRegistryService } from './runtime/proto-registry.service';
import { RuntimeCredentialResolverService } from './runtime/runtime-credential-resolver.service';
import { RuntimeJwtMinterService } from './runtime/runtime-jwt-minter.service';
import { RuntimeProviderRegistry } from './runtime/runtime-provider.registry';
import { RustRuntimeProvider } from './runtime/rust-runtime.provider';
import { EventRepository } from './storage/event.repository';
Expand Down Expand Up @@ -93,6 +94,7 @@ import { WebhookService } from './webhooks/webhook.service';
TraceService,
RedactionService,
ProtoRegistryService,
RuntimeJwtMinterService,
RuntimeCredentialResolverService,
RustRuntimeProvider,
RuntimeProviderRegistry,
Expand Down
12 changes: 12 additions & 0 deletions src/config/app-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ export class AppConfigService implements OnModuleInit {
readonly runtimeUseDevHeader = readBoolean('RUNTIME_USE_DEV_HEADER', process.env.NODE_ENV === 'development');
readonly runtimeRequestTimeoutMs = readNumber('RUNTIME_REQUEST_TIMEOUT_MS', 30000);
readonly runtimeDevAgentId = process.env.RUNTIME_DEV_AGENT_ID ?? 'control-plane';

/**
* When `MACP_AUTH_SERVICE_URL` is set, the credential resolver mints a
* short-lived JWT for the `control-plane` sender via the auth-service
* instead of using the static `RUNTIME_BEARER_TOKEN`. The minted token is
* cached and refreshed on a TTL boundary. Falls back to the static bearer
* if the URL isn't set, so deploys can switch incrementally.
*/
readonly authServiceUrl = process.env.MACP_AUTH_SERVICE_URL ?? '';
readonly authServiceTimeoutMs = readNumber('MACP_AUTH_SERVICE_TIMEOUT_MS', 5000);
readonly authTokenTtlSeconds = readNumber('MACP_AUTH_TOKEN_TTL_SECONDS', 3600);
readonly authTokenSender = process.env.MACP_AUTH_TOKEN_SENDER ?? 'control-plane';
/**
* Observer-mode poll cadence. Control-plane polls GetSession after POST /runs
* until the initiator agent opens the session. See direct-agent-auth §End-to-end target flow.
Expand Down
83 changes: 79 additions & 4 deletions src/projection/projection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,55 @@ export class ProjectionService {
}
case 'message.received': {
const sender = String(event.data.sender ?? event.data.from ?? '');
const recipients = (event.data.to as string[] | undefined) ?? [];
this.touchParticipant(next, sender, event.ts, 'active', String(event.data.messageType ?? event.type));
const messageType = String(event.data.messageType ?? '');
const explicitRecipients = (event.data.to as string[] | undefined) ?? [];
const decoded = (event.data.decodedPayload ?? event.data.payload ?? {}) as Record<string, unknown>;
this.touchParticipant(next, sender, event.ts, 'active', messageType || event.type);

// Build the recipient set from whatever signal we have:
// - explicit `to` (if the event has it)
// - SessionStart payload's `participants` list (broadcast)
// - declared participants in projection (fan-out for Proposal)
const declaredParticipants = next.participants.map((p) => p.participantId);
let recipients = explicitRecipients;
if (recipients.length === 0 && messageType === 'SessionStart') {
const sessionParticipants = (decoded.participants ?? decoded.participantIds ?? []) as unknown[];
if (Array.isArray(sessionParticipants)) {
recipients = sessionParticipants
.filter((p): p is string => typeof p === 'string' && p !== sender);
}
} else if (recipients.length === 0 && messageType === 'Proposal') {
recipients = declaredParticipants.filter((p) => p !== sender);
}

recipients.forEach((recipient) => this.touchParticipant(next, recipient, event.ts, 'waiting', undefined));

// Lifecycle nodes synthesized so the graph shows the canonical flow
// (start → proposal → decision → outcome) on top of the agent layer.
this.upsertNode(next, { id: '__start', kind: 'start', status: 'completed' });
if (messageType === 'SessionStart') {
this.upsertEdge(next, '__start', sender, 'session.bound', event.ts);
recipients.forEach((r) => this.upsertEdge(next, sender, r, 'fanout', event.ts));
} else if (messageType === 'Proposal') {
this.upsertNode(next, { id: '__proposal', kind: 'proposal', status: 'completed' });
this.upsertEdge(next, sender, '__proposal', 'proposes', event.ts);
recipients.forEach((r) => this.upsertEdge(next, '__proposal', r, 'review', event.ts));
} else if (messageType === 'Vote' || messageType === 'Evaluation' || messageType === 'Objection') {
this.upsertNode(next, { id: '__decision', kind: 'decision', status: 'active' });
this.upsertEdge(next, sender, '__decision', messageType.toLowerCase(), event.ts);
} else if (messageType === 'Commitment') {
this.upsertNode(next, { id: '__decision', kind: 'decision', status: 'completed' });
this.upsertNode(next, { id: '__outcome', kind: 'output', status: 'completed' });
this.upsertEdge(next, '__decision', '__outcome', 'commits', event.ts);
}

// Direct sender→recipient edges (only when we actually inferred them).
recipients.forEach((recipient) => {
if (sender && recipient) {
next.graph.edges.push({ from: sender, to: recipient, kind: event.type, ts: event.ts });
this.upsertEdge(next, sender, recipient, messageType || event.type, event.ts);
}
});
next.graph.edges = next.graph.edges.slice(-200);
next.graph.edges = next.graph.edges.slice(-300);
break;
}
case 'signal.emitted': {
Expand Down Expand Up @@ -527,6 +567,41 @@ export class ProjectionService {
}
}

private upsertNode(
projection: RunStateProjection,
node: { id: string; kind: string; status: string }
) {
const existing = projection.graph.nodes.find((n) => n.id === node.id);
if (existing) {
// Promote status: completed > active > waiting > idle. Don't downgrade
// a finished lifecycle node back to "active".
const order = ['idle', 'waiting', 'active', 'completed', 'failed'];
const oldRank = order.indexOf(existing.status);
const newRank = order.indexOf(node.status);
if (newRank > oldRank) existing.status = node.status;
return;
}
projection.graph.nodes.push(node);
}

private upsertEdge(
projection: RunStateProjection,
from: string,
to: string,
kind: string,
ts: string
) {
if (!from || !to || from === to) return;
const exists = projection.graph.edges.find(
(e) => e.from === from && e.to === to && e.kind === kind
);
if (exists) {
exists.ts = ts;
return;
}
projection.graph.edges.push({ from, to, kind, ts });
}

private touchParticipant(
projection: RunStateProjection,
participantId: string,
Expand Down
20 changes: 17 additions & 3 deletions src/runs/session-discovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import { AppConfigService } from '../config/app-config.service';
export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(SessionDiscoveryService.name);
private aborted = false;
private loopPromise?: Promise<void>;
private reconnectTimer?: ReturnType<typeof setTimeout>;
private reconnectResolve?: () => void;
private readonly knownSessions = new Set<string>();

constructor(
Expand All @@ -31,11 +34,17 @@ export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy {
this.logger.log('Session discovery disabled (SESSION_DISCOVERY_ENABLED=false)');
return;
}
void this.startDiscoveryLoop();
this.loopPromise = this.startDiscoveryLoop();
}

onModuleDestroy(): void {
async onModuleDestroy(): Promise<void> {
this.aborted = true;
// Cancel any in-flight reconnect sleep so shutdown doesn't block for 5s.
if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
if (this.reconnectResolve) this.reconnectResolve();
// Await the discovery loop — including any in-flight handleSessionCreated
// DB writes — before returning so the pool isn't closed under them.
if (this.loopPromise) await this.loopPromise.catch(() => undefined);
}

private async startDiscoveryLoop(): Promise<void> {
Expand All @@ -48,7 +57,12 @@ export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy {
if (this.aborted) return;
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`WatchSessions stream ended: ${message}. Reconnecting in 5s...`);
await new Promise((r) => setTimeout(r, 5000));
await new Promise<void>((resolve) => {
this.reconnectResolve = resolve;
this.reconnectTimer = setTimeout(resolve, 5000);
});
this.reconnectTimer = undefined;
this.reconnectResolve = undefined;
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions src/runs/signal-consumer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import type { NormalizeContext } from '../contracts/runtime';
export class SignalConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(SignalConsumerService.name);
private aborted = false;
private loopPromise?: Promise<void>;
private reconnectTimer?: ReturnType<typeof setTimeout>;
private reconnectResolve?: () => void;

constructor(
private readonly providerRegistry: RuntimeProviderRegistry,
Expand All @@ -36,11 +39,16 @@ export class SignalConsumerService implements OnModuleInit, OnModuleDestroy {
this.logger.log('Signal consumer disabled (gated on SESSION_DISCOVERY_ENABLED)');
return;
}
void this.startConsumeLoop();
this.loopPromise = this.startConsumeLoop();
}

onModuleDestroy(): void {
async onModuleDestroy(): Promise<void> {
this.aborted = true;
if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
if (this.reconnectResolve) this.reconnectResolve();
// Await the consume loop so in-flight persistRawAndCanonical calls finish
// before the DB pool closes.
if (this.loopPromise) await this.loopPromise.catch(() => undefined);
}

private async startConsumeLoop(): Promise<void> {
Expand All @@ -53,7 +61,12 @@ export class SignalConsumerService implements OnModuleInit, OnModuleDestroy {
if (this.aborted) return;
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`WatchSignals stream ended: ${message}. Reconnecting in 5s...`);
await new Promise((r) => setTimeout(r, 5000));
await new Promise<void>((resolve) => {
this.reconnectResolve = resolve;
this.reconnectTimer = setTimeout(resolve, 5000);
});
this.reconnectTimer = undefined;
this.reconnectResolve = undefined;
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/runs/stream-consumer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ interface ActiveStream {
connected: boolean;
lastProcessedSeq: number;
finalizingPromise?: Promise<void>;
/** Tracks the consumeLoop so shutdown can await in-flight persistence. */
loopPromise?: Promise<void>;
}

@Injectable()
Expand All @@ -37,10 +39,20 @@ export class StreamConsumerService implements OnModuleDestroy {
) {}

async onModuleDestroy(): Promise<void> {
const pending: Promise<void>[] = [];
for (const [runId, marker] of this.active) {
marker.aborted = true;
this.logger.log(`aborting stream for run ${runId} on shutdown`);
if (marker.loopPromise) pending.push(marker.loopPromise);
}
// Bounded drain: wait for consumeLoops to observe abort and finish any
// in-flight persistRawAndCanonical before returning, so the DB pool
// isn't closed under them. Capped to avoid blocking shutdown on stuck
// gRPC calls.
await Promise.race([
Promise.allSettled(pending),
new Promise<void>((resolve) => setTimeout(resolve, 2000))
]);
}

async start(params: {
Expand All @@ -62,7 +74,7 @@ export class StreamConsumerService implements OnModuleDestroy {
};
this.active.set(params.runId, marker);
this.instrumentation.activeStreams.inc();
void this.consumeLoop(marker, params).finally(() => {
marker.loopPromise = this.consumeLoop(marker, params).finally(() => {
this.instrumentation.activeStreams.dec();
this.active.delete(params.runId);
});
Expand Down
43 changes: 42 additions & 1 deletion src/runtime/runtime-credential-resolver.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { RuntimeCredentialResolverService } from './runtime-credential-resolver.service';
import { RuntimeJwtMinterService } from './runtime-jwt-minter.service';
import { AppConfigService } from '../config/app-config.service';

describe('RuntimeCredentialResolverService (single-bearer, CP-9)', () => {
Expand All @@ -9,7 +10,13 @@ describe('RuntimeCredentialResolverService (single-bearer, CP-9)', () => {
runtimeUseDevHeader: false,
...config
} as AppConfigService;
return new RuntimeCredentialResolverService(merged);
// JWT minter disabled in these tests — auth-service URL is unset, so the
// resolver exercises the static-bearer / dev-header paths.
const jwtMinter = {
isEnabled: () => false,
getToken: () => Promise.reject(new Error('jwt disabled in unit test'))
} as unknown as RuntimeJwtMinterService;
return new RuntimeCredentialResolverService(merged, jwtMinter);
}

describe('sender identity', () => {
Expand Down Expand Up @@ -64,6 +71,40 @@ describe('RuntimeCredentialResolverService (single-bearer, CP-9)', () => {
});
});

describe('JWT mint path', () => {
it('uses the minted JWT as Authorization when the minter is enabled', async () => {
const merged = {
runtimeDevAgentId: 'control-plane',
runtimeBearerToken: '',
runtimeUseDevHeader: false
} as AppConfigService;
const jwtMinter = {
isEnabled: () => true,
getToken: jest.fn().mockResolvedValue('minted-jwt-token')
} as unknown as RuntimeJwtMinterService;
const service = new RuntimeCredentialResolverService(merged, jwtMinter);

const result = await service.resolve({ runtimeKind: 'rust' });
expect(result.metadata.authorization).toBe('Bearer minted-jwt-token');
});

it('falls back to static bearer when the mint rejects', async () => {
const merged = {
runtimeDevAgentId: 'control-plane',
runtimeBearerToken: 'fallback-bearer',
runtimeUseDevHeader: false
} as AppConfigService;
const jwtMinter = {
isEnabled: () => true,
getToken: jest.fn().mockRejectedValue(new Error('auth-service down'))
} as unknown as RuntimeJwtMinterService;
const service = new RuntimeCredentialResolverService(merged, jwtMinter);

const result = await service.resolve({ runtimeKind: 'rust' });
expect(result.metadata.authorization).toBe('Bearer fallback-bearer');
});
});

describe('invariant — no per-sender overrides (direct-agent-auth §Invariants)', () => {
it('ignores any extra fields in the request (participant, requester, fallbackSender)', async () => {
const service = makeService({ runtimeBearerToken: 'obs-token' });
Expand Down
44 changes: 35 additions & 9 deletions src/runtime/runtime-credential-resolver.service.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,52 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { AppConfigService } from '../config/app-config.service';
import { RuntimeCredentialResolver, RuntimeCredentials } from '../contracts/runtime';
import { RuntimeJwtMinterService } from './runtime-jwt-minter.service';

/**
* Single-bearer credential resolver (CP-9, direct-agent-auth.md).
* Single-identity credential resolver for the control-plane.
*
* The control-plane has one runtime identity — its own least-privilege Bearer
* token with `can_start_sessions: false`. All observer calls (GetSession,
* StreamSession, ListPolicies, CancelSession) use this identity.
* Two modes (chosen at runtime by env var):
*
* Per-agent token maps were removed because agents now authenticate to the
* runtime directly (RFC-MACP-0004 §4). The control-plane never forges envelopes
* on behalf of agents.
* 1. **JWT mode** (preferred) — when `MACP_AUTH_SERVICE_URL` is set, mints
* a short-lived RS256 JWT for `control-plane` via auth-service and
* caches it. Long-running CP processes refresh on a TTL boundary.
*
* 2. **Static-bearer mode** (fallback) — when the auth-service URL is
* unset, uses the static `RUNTIME_BEARER_TOKEN` from env. This path
* is preserved so deploys can switch incrementally.
*
* Either way, the resolver returns the same shape — gRPC sees a normal
* `Authorization: Bearer …` header with no knowledge of which mode minted
* the token.
*/
@Injectable()
export class RuntimeCredentialResolverService implements RuntimeCredentialResolver {
constructor(private readonly config: AppConfigService) {}
private readonly logger = new Logger(RuntimeCredentialResolverService.name);

constructor(
private readonly config: AppConfigService,
private readonly jwtMinter: RuntimeJwtMinterService
) {}

async resolve(_req: { runtimeKind: string }): Promise<RuntimeCredentials> {
const sender = this.config.runtimeDevAgentId;
const metadata: Record<string, string> = {};

if (this.jwtMinter.isEnabled()) {
try {
const token = await this.jwtMinter.getToken();
metadata.authorization = `Bearer ${token}`;
return { metadata, sender };
} catch (err) {
// Fall through to static-bearer / dev-header fallbacks if the
// mint fails. Better to degrade than fail every gRPC call when
// auth-service is briefly unreachable.
const reason = err instanceof Error ? err.message : String(err);
this.logger.warn(`JWT mint failed; falling back to static bearer: ${reason}`);
}
}

if (this.config.runtimeBearerToken) {
metadata.authorization = `Bearer ${this.config.runtimeBearerToken}`;
} else if (this.config.runtimeUseDevHeader) {
Expand Down
Loading
Loading