diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 93b5b35..749a0fe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -142,7 +142,7 @@ jobs: fail=0 echo "::group::throw new Error outside allowed files" - if grep -rn "throw new Error" src --include='*.ts' | grep -vE '(app-config|migrate|\.spec\.ts|circuit-breaker\.ts|proto-registry\.service\.ts|grpc-helpers\.ts|run\.repository\.ts|webhook\.service\.ts|run-recovery\.service\.ts)'; then + if grep -rn "throw new Error" src --include='*.ts' | grep -vE '(app-config|migrate|\.spec\.ts|circuit-breaker\.ts|proto-registry\.service\.ts|grpc-helpers\.ts|run\.repository\.ts|webhook\.service\.ts|run-recovery\.service\.ts|runtime-jwt-minter\.service\.ts)'; then echo "::error::throw new Error found outside allowed files. Use AppException or a Nest built-in." fail=1 fi diff --git a/src/app.module.ts b/src/app.module.ts index a4944ee..04b2ec0 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -32,6 +32,7 @@ import { ProjectionService } from './projection/projection.service'; import { ReplayService } from './replay/replay.service'; import { ProtoRegistryService } from './runtime/proto-registry.service'; import { RuntimeCredentialResolverService } from './runtime/runtime-credential-resolver.service'; +import { RuntimeJwtMinterService } from './runtime/runtime-jwt-minter.service'; import { RuntimeProviderRegistry } from './runtime/runtime-provider.registry'; import { RustRuntimeProvider } from './runtime/rust-runtime.provider'; import { EventRepository } from './storage/event.repository'; @@ -93,6 +94,7 @@ import { WebhookService } from './webhooks/webhook.service'; TraceService, RedactionService, ProtoRegistryService, + RuntimeJwtMinterService, RuntimeCredentialResolverService, RustRuntimeProvider, RuntimeProviderRegistry, diff --git a/src/config/app-config.service.ts b/src/config/app-config.service.ts index 157b45f..8e3e48c 100644 --- a/src/config/app-config.service.ts +++ b/src/config/app-config.service.ts @@ -61,6 +61,18 @@ export class AppConfigService implements OnModuleInit { readonly runtimeUseDevHeader = readBoolean('RUNTIME_USE_DEV_HEADER', process.env.NODE_ENV === 'development'); readonly runtimeRequestTimeoutMs = readNumber('RUNTIME_REQUEST_TIMEOUT_MS', 30000); readonly runtimeDevAgentId = process.env.RUNTIME_DEV_AGENT_ID ?? 'control-plane'; + + /** + * When `MACP_AUTH_SERVICE_URL` is set, the credential resolver mints a + * short-lived JWT for the `control-plane` sender via the auth-service + * instead of using the static `RUNTIME_BEARER_TOKEN`. The minted token is + * cached and refreshed on a TTL boundary. Falls back to the static bearer + * if the URL isn't set, so deploys can switch incrementally. + */ + readonly authServiceUrl = process.env.MACP_AUTH_SERVICE_URL ?? ''; + readonly authServiceTimeoutMs = readNumber('MACP_AUTH_SERVICE_TIMEOUT_MS', 5000); + readonly authTokenTtlSeconds = readNumber('MACP_AUTH_TOKEN_TTL_SECONDS', 3600); + readonly authTokenSender = process.env.MACP_AUTH_TOKEN_SENDER ?? 'control-plane'; /** * Observer-mode poll cadence. Control-plane polls GetSession after POST /runs * until the initiator agent opens the session. See direct-agent-auth §End-to-end target flow. diff --git a/src/projection/projection.service.ts b/src/projection/projection.service.ts index 5b60701..e81fd6e 100644 --- a/src/projection/projection.service.ts +++ b/src/projection/projection.service.ts @@ -221,15 +221,55 @@ export class ProjectionService { } case 'message.received': { const sender = String(event.data.sender ?? event.data.from ?? ''); - const recipients = (event.data.to as string[] | undefined) ?? []; - this.touchParticipant(next, sender, event.ts, 'active', String(event.data.messageType ?? event.type)); + const messageType = String(event.data.messageType ?? ''); + const explicitRecipients = (event.data.to as string[] | undefined) ?? []; + const decoded = (event.data.decodedPayload ?? event.data.payload ?? {}) as Record; + this.touchParticipant(next, sender, event.ts, 'active', messageType || event.type); + + // Build the recipient set from whatever signal we have: + // - explicit `to` (if the event has it) + // - SessionStart payload's `participants` list (broadcast) + // - declared participants in projection (fan-out for Proposal) + const declaredParticipants = next.participants.map((p) => p.participantId); + let recipients = explicitRecipients; + if (recipients.length === 0 && messageType === 'SessionStart') { + const sessionParticipants = (decoded.participants ?? decoded.participantIds ?? []) as unknown[]; + if (Array.isArray(sessionParticipants)) { + recipients = sessionParticipants + .filter((p): p is string => typeof p === 'string' && p !== sender); + } + } else if (recipients.length === 0 && messageType === 'Proposal') { + recipients = declaredParticipants.filter((p) => p !== sender); + } + recipients.forEach((recipient) => this.touchParticipant(next, recipient, event.ts, 'waiting', undefined)); + + // Lifecycle nodes synthesized so the graph shows the canonical flow + // (start → proposal → decision → outcome) on top of the agent layer. + this.upsertNode(next, { id: '__start', kind: 'start', status: 'completed' }); + if (messageType === 'SessionStart') { + this.upsertEdge(next, '__start', sender, 'session.bound', event.ts); + recipients.forEach((r) => this.upsertEdge(next, sender, r, 'fanout', event.ts)); + } else if (messageType === 'Proposal') { + this.upsertNode(next, { id: '__proposal', kind: 'proposal', status: 'completed' }); + this.upsertEdge(next, sender, '__proposal', 'proposes', event.ts); + recipients.forEach((r) => this.upsertEdge(next, '__proposal', r, 'review', event.ts)); + } else if (messageType === 'Vote' || messageType === 'Evaluation' || messageType === 'Objection') { + this.upsertNode(next, { id: '__decision', kind: 'decision', status: 'active' }); + this.upsertEdge(next, sender, '__decision', messageType.toLowerCase(), event.ts); + } else if (messageType === 'Commitment') { + this.upsertNode(next, { id: '__decision', kind: 'decision', status: 'completed' }); + this.upsertNode(next, { id: '__outcome', kind: 'output', status: 'completed' }); + this.upsertEdge(next, '__decision', '__outcome', 'commits', event.ts); + } + + // Direct sender→recipient edges (only when we actually inferred them). recipients.forEach((recipient) => { if (sender && recipient) { - next.graph.edges.push({ from: sender, to: recipient, kind: event.type, ts: event.ts }); + this.upsertEdge(next, sender, recipient, messageType || event.type, event.ts); } }); - next.graph.edges = next.graph.edges.slice(-200); + next.graph.edges = next.graph.edges.slice(-300); break; } case 'signal.emitted': { @@ -527,6 +567,41 @@ export class ProjectionService { } } + private upsertNode( + projection: RunStateProjection, + node: { id: string; kind: string; status: string } + ) { + const existing = projection.graph.nodes.find((n) => n.id === node.id); + if (existing) { + // Promote status: completed > active > waiting > idle. Don't downgrade + // a finished lifecycle node back to "active". + const order = ['idle', 'waiting', 'active', 'completed', 'failed']; + const oldRank = order.indexOf(existing.status); + const newRank = order.indexOf(node.status); + if (newRank > oldRank) existing.status = node.status; + return; + } + projection.graph.nodes.push(node); + } + + private upsertEdge( + projection: RunStateProjection, + from: string, + to: string, + kind: string, + ts: string + ) { + if (!from || !to || from === to) return; + const exists = projection.graph.edges.find( + (e) => e.from === from && e.to === to && e.kind === kind + ); + if (exists) { + exists.ts = ts; + return; + } + projection.graph.edges.push({ from, to, kind, ts }); + } + private touchParticipant( projection: RunStateProjection, participantId: string, diff --git a/src/runs/session-discovery.service.ts b/src/runs/session-discovery.service.ts index 7bab7b8..ad70e43 100644 --- a/src/runs/session-discovery.service.ts +++ b/src/runs/session-discovery.service.ts @@ -16,6 +16,9 @@ import { AppConfigService } from '../config/app-config.service'; export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(SessionDiscoveryService.name); private aborted = false; + private loopPromise?: Promise; + private reconnectTimer?: ReturnType; + private reconnectResolve?: () => void; private readonly knownSessions = new Set(); constructor( @@ -31,11 +34,17 @@ export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy { this.logger.log('Session discovery disabled (SESSION_DISCOVERY_ENABLED=false)'); return; } - void this.startDiscoveryLoop(); + this.loopPromise = this.startDiscoveryLoop(); } - onModuleDestroy(): void { + async onModuleDestroy(): Promise { this.aborted = true; + // Cancel any in-flight reconnect sleep so shutdown doesn't block for 5s. + if (this.reconnectTimer) clearTimeout(this.reconnectTimer); + if (this.reconnectResolve) this.reconnectResolve(); + // Await the discovery loop — including any in-flight handleSessionCreated + // DB writes — before returning so the pool isn't closed under them. + if (this.loopPromise) await this.loopPromise.catch(() => undefined); } private async startDiscoveryLoop(): Promise { @@ -48,7 +57,12 @@ export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy { if (this.aborted) return; const message = error instanceof Error ? error.message : String(error); this.logger.warn(`WatchSessions stream ended: ${message}. Reconnecting in 5s...`); - await new Promise((r) => setTimeout(r, 5000)); + await new Promise((resolve) => { + this.reconnectResolve = resolve; + this.reconnectTimer = setTimeout(resolve, 5000); + }); + this.reconnectTimer = undefined; + this.reconnectResolve = undefined; } } } diff --git a/src/runs/signal-consumer.service.ts b/src/runs/signal-consumer.service.ts index 407807f..cb867a3 100644 --- a/src/runs/signal-consumer.service.ts +++ b/src/runs/signal-consumer.service.ts @@ -21,6 +21,9 @@ import type { NormalizeContext } from '../contracts/runtime'; export class SignalConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(SignalConsumerService.name); private aborted = false; + private loopPromise?: Promise; + private reconnectTimer?: ReturnType; + private reconnectResolve?: () => void; constructor( private readonly providerRegistry: RuntimeProviderRegistry, @@ -36,11 +39,16 @@ export class SignalConsumerService implements OnModuleInit, OnModuleDestroy { this.logger.log('Signal consumer disabled (gated on SESSION_DISCOVERY_ENABLED)'); return; } - void this.startConsumeLoop(); + this.loopPromise = this.startConsumeLoop(); } - onModuleDestroy(): void { + async onModuleDestroy(): Promise { this.aborted = true; + if (this.reconnectTimer) clearTimeout(this.reconnectTimer); + if (this.reconnectResolve) this.reconnectResolve(); + // Await the consume loop so in-flight persistRawAndCanonical calls finish + // before the DB pool closes. + if (this.loopPromise) await this.loopPromise.catch(() => undefined); } private async startConsumeLoop(): Promise { @@ -53,7 +61,12 @@ export class SignalConsumerService implements OnModuleInit, OnModuleDestroy { if (this.aborted) return; const message = error instanceof Error ? error.message : String(error); this.logger.warn(`WatchSignals stream ended: ${message}. Reconnecting in 5s...`); - await new Promise((r) => setTimeout(r, 5000)); + await new Promise((resolve) => { + this.reconnectResolve = resolve; + this.reconnectTimer = setTimeout(resolve, 5000); + }); + this.reconnectTimer = undefined; + this.reconnectResolve = undefined; } } } diff --git a/src/runs/stream-consumer.service.ts b/src/runs/stream-consumer.service.ts index 92899f8..0d25473 100644 --- a/src/runs/stream-consumer.service.ts +++ b/src/runs/stream-consumer.service.ts @@ -17,6 +17,8 @@ interface ActiveStream { connected: boolean; lastProcessedSeq: number; finalizingPromise?: Promise; + /** Tracks the consumeLoop so shutdown can await in-flight persistence. */ + loopPromise?: Promise; } @Injectable() @@ -37,10 +39,20 @@ export class StreamConsumerService implements OnModuleDestroy { ) {} async onModuleDestroy(): Promise { + const pending: Promise[] = []; for (const [runId, marker] of this.active) { marker.aborted = true; this.logger.log(`aborting stream for run ${runId} on shutdown`); + if (marker.loopPromise) pending.push(marker.loopPromise); } + // Bounded drain: wait for consumeLoops to observe abort and finish any + // in-flight persistRawAndCanonical before returning, so the DB pool + // isn't closed under them. Capped to avoid blocking shutdown on stuck + // gRPC calls. + await Promise.race([ + Promise.allSettled(pending), + new Promise((resolve) => setTimeout(resolve, 2000)) + ]); } async start(params: { @@ -62,7 +74,7 @@ export class StreamConsumerService implements OnModuleDestroy { }; this.active.set(params.runId, marker); this.instrumentation.activeStreams.inc(); - void this.consumeLoop(marker, params).finally(() => { + marker.loopPromise = this.consumeLoop(marker, params).finally(() => { this.instrumentation.activeStreams.dec(); this.active.delete(params.runId); }); diff --git a/src/runtime/runtime-credential-resolver.service.spec.ts b/src/runtime/runtime-credential-resolver.service.spec.ts index 87a3162..0f3a75a 100644 --- a/src/runtime/runtime-credential-resolver.service.spec.ts +++ b/src/runtime/runtime-credential-resolver.service.spec.ts @@ -1,4 +1,5 @@ import { RuntimeCredentialResolverService } from './runtime-credential-resolver.service'; +import { RuntimeJwtMinterService } from './runtime-jwt-minter.service'; import { AppConfigService } from '../config/app-config.service'; describe('RuntimeCredentialResolverService (single-bearer, CP-9)', () => { @@ -9,7 +10,13 @@ describe('RuntimeCredentialResolverService (single-bearer, CP-9)', () => { runtimeUseDevHeader: false, ...config } as AppConfigService; - return new RuntimeCredentialResolverService(merged); + // JWT minter disabled in these tests — auth-service URL is unset, so the + // resolver exercises the static-bearer / dev-header paths. + const jwtMinter = { + isEnabled: () => false, + getToken: () => Promise.reject(new Error('jwt disabled in unit test')) + } as unknown as RuntimeJwtMinterService; + return new RuntimeCredentialResolverService(merged, jwtMinter); } describe('sender identity', () => { @@ -64,6 +71,40 @@ describe('RuntimeCredentialResolverService (single-bearer, CP-9)', () => { }); }); + describe('JWT mint path', () => { + it('uses the minted JWT as Authorization when the minter is enabled', async () => { + const merged = { + runtimeDevAgentId: 'control-plane', + runtimeBearerToken: '', + runtimeUseDevHeader: false + } as AppConfigService; + const jwtMinter = { + isEnabled: () => true, + getToken: jest.fn().mockResolvedValue('minted-jwt-token') + } as unknown as RuntimeJwtMinterService; + const service = new RuntimeCredentialResolverService(merged, jwtMinter); + + const result = await service.resolve({ runtimeKind: 'rust' }); + expect(result.metadata.authorization).toBe('Bearer minted-jwt-token'); + }); + + it('falls back to static bearer when the mint rejects', async () => { + const merged = { + runtimeDevAgentId: 'control-plane', + runtimeBearerToken: 'fallback-bearer', + runtimeUseDevHeader: false + } as AppConfigService; + const jwtMinter = { + isEnabled: () => true, + getToken: jest.fn().mockRejectedValue(new Error('auth-service down')) + } as unknown as RuntimeJwtMinterService; + const service = new RuntimeCredentialResolverService(merged, jwtMinter); + + const result = await service.resolve({ runtimeKind: 'rust' }); + expect(result.metadata.authorization).toBe('Bearer fallback-bearer'); + }); + }); + describe('invariant — no per-sender overrides (direct-agent-auth §Invariants)', () => { it('ignores any extra fields in the request (participant, requester, fallbackSender)', async () => { const service = makeService({ runtimeBearerToken: 'obs-token' }); diff --git a/src/runtime/runtime-credential-resolver.service.ts b/src/runtime/runtime-credential-resolver.service.ts index 72d6e23..5a2c348 100644 --- a/src/runtime/runtime-credential-resolver.service.ts +++ b/src/runtime/runtime-credential-resolver.service.ts @@ -1,26 +1,52 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { AppConfigService } from '../config/app-config.service'; import { RuntimeCredentialResolver, RuntimeCredentials } from '../contracts/runtime'; +import { RuntimeJwtMinterService } from './runtime-jwt-minter.service'; /** - * Single-bearer credential resolver (CP-9, direct-agent-auth.md). + * Single-identity credential resolver for the control-plane. * - * The control-plane has one runtime identity — its own least-privilege Bearer - * token with `can_start_sessions: false`. All observer calls (GetSession, - * StreamSession, ListPolicies, CancelSession) use this identity. + * Two modes (chosen at runtime by env var): * - * Per-agent token maps were removed because agents now authenticate to the - * runtime directly (RFC-MACP-0004 §4). The control-plane never forges envelopes - * on behalf of agents. + * 1. **JWT mode** (preferred) — when `MACP_AUTH_SERVICE_URL` is set, mints + * a short-lived RS256 JWT for `control-plane` via auth-service and + * caches it. Long-running CP processes refresh on a TTL boundary. + * + * 2. **Static-bearer mode** (fallback) — when the auth-service URL is + * unset, uses the static `RUNTIME_BEARER_TOKEN` from env. This path + * is preserved so deploys can switch incrementally. + * + * Either way, the resolver returns the same shape — gRPC sees a normal + * `Authorization: Bearer …` header with no knowledge of which mode minted + * the token. */ @Injectable() export class RuntimeCredentialResolverService implements RuntimeCredentialResolver { - constructor(private readonly config: AppConfigService) {} + private readonly logger = new Logger(RuntimeCredentialResolverService.name); + + constructor( + private readonly config: AppConfigService, + private readonly jwtMinter: RuntimeJwtMinterService + ) {} async resolve(_req: { runtimeKind: string }): Promise { const sender = this.config.runtimeDevAgentId; const metadata: Record = {}; + if (this.jwtMinter.isEnabled()) { + try { + const token = await this.jwtMinter.getToken(); + metadata.authorization = `Bearer ${token}`; + return { metadata, sender }; + } catch (err) { + // Fall through to static-bearer / dev-header fallbacks if the + // mint fails. Better to degrade than fail every gRPC call when + // auth-service is briefly unreachable. + const reason = err instanceof Error ? err.message : String(err); + this.logger.warn(`JWT mint failed; falling back to static bearer: ${reason}`); + } + } + if (this.config.runtimeBearerToken) { metadata.authorization = `Bearer ${this.config.runtimeBearerToken}`; } else if (this.config.runtimeUseDevHeader) { diff --git a/src/runtime/runtime-jwt-minter.service.spec.ts b/src/runtime/runtime-jwt-minter.service.spec.ts new file mode 100644 index 0000000..5e98a01 --- /dev/null +++ b/src/runtime/runtime-jwt-minter.service.spec.ts @@ -0,0 +1,132 @@ +import { RuntimeJwtMinterService } from './runtime-jwt-minter.service'; +import { AppConfigService } from '../config/app-config.service'; + +type FetchMock = jest.Mock, [input: RequestInfo | URL, init?: RequestInit]>; + +describe('RuntimeJwtMinterService', () => { + const baseConfig = { + authServiceUrl: 'https://auth.example/', + authServiceTimeoutMs: 5000, + authTokenSender: 'control-plane', + authTokenTtlSeconds: 3600 + } as unknown as AppConfigService; + + const originalFetch = globalThis.fetch; + let fetchMock: FetchMock; + + beforeEach(() => { + fetchMock = jest.fn() as unknown as FetchMock; + (globalThis as { fetch: unknown }).fetch = fetchMock; + }); + + afterEach(() => { + (globalThis as { fetch: unknown }).fetch = originalFetch; + jest.useRealTimers(); + }); + + function jsonResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { 'content-type': 'application/json' } + }); + } + + describe('isEnabled()', () => { + it('returns true when MACP_AUTH_SERVICE_URL is set', () => { + const minter = new RuntimeJwtMinterService(baseConfig); + expect(minter.isEnabled()).toBe(true); + }); + + it('returns false when MACP_AUTH_SERVICE_URL is empty', () => { + const minter = new RuntimeJwtMinterService({ + ...baseConfig, + authServiceUrl: '' + } as unknown as AppConfigService); + expect(minter.isEnabled()).toBe(false); + }); + }); + + describe('getToken()', () => { + it('throws when minter is disabled', async () => { + const minter = new RuntimeJwtMinterService({ + ...baseConfig, + authServiceUrl: '' + } as unknown as AppConfigService); + await expect(minter.getToken()).rejects.toThrow(/MACP_AUTH_SERVICE_URL is unset/); + }); + + it('mints a token via auth-service and caches it across calls', async () => { + fetchMock.mockResolvedValueOnce( + jsonResponse({ token: 'jwt-1', sender: 'control-plane', expires_in_seconds: 3600 }) + ); + const minter = new RuntimeJwtMinterService(baseConfig); + + const first = await minter.getToken(); + const second = await minter.getToken(); + + expect(first).toBe('jwt-1'); + expect(second).toBe('jwt-1'); + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, init] = fetchMock.mock.calls[0]; + expect(String(url)).toBe('https://auth.example/tokens'); + expect(init?.method).toBe('POST'); + const body = JSON.parse((init?.body as string) ?? '{}'); + expect(body.sender).toBe('control-plane'); + expect(body.scopes.is_observer).toBe(true); + expect(body.scopes.can_start_sessions).toBe(false); + }); + + it('dedupes concurrent refreshes into a single inflight request', async () => { + fetchMock.mockResolvedValueOnce( + jsonResponse({ token: 'jwt-once', expires_in_seconds: 3600 }) + ); + const minter = new RuntimeJwtMinterService(baseConfig); + + const [a, b, c] = await Promise.all([minter.getToken(), minter.getToken(), minter.getToken()]); + + expect(a).toBe('jwt-once'); + expect(b).toBe('jwt-once'); + expect(c).toBe('jwt-once'); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('throws when auth-service returns non-2xx', async () => { + fetchMock.mockResolvedValueOnce(new Response('forbidden', { status: 403 })); + const minter = new RuntimeJwtMinterService(baseConfig); + await expect(minter.getToken()).rejects.toThrow(/auth-service returned 403/); + }); + + it('throws when the auth-service response is missing a token', async () => { + fetchMock.mockResolvedValueOnce(jsonResponse({ sender: 'control-plane' })); + const minter = new RuntimeJwtMinterService(baseConfig); + await expect(minter.getToken()).rejects.toThrow(/missing token/); + }); + + it('wraps network failures with a descriptive error', async () => { + fetchMock.mockRejectedValueOnce(new TypeError('connect ECONNREFUSED')); + const minter = new RuntimeJwtMinterService(baseConfig); + await expect(minter.getToken()).rejects.toThrow(/auth-service request failed/); + }); + + it('refreshes the cached token once it passes the refresh buffer', async () => { + fetchMock + .mockResolvedValueOnce(jsonResponse({ token: 'jwt-1', expires_in_seconds: 60 })) + .mockResolvedValueOnce(jsonResponse({ token: 'jwt-2', expires_in_seconds: 60 })); + + const realNow = Date.now(); + const nowSpy = jest.spyOn(Date, 'now').mockReturnValue(realNow); + const minter = new RuntimeJwtMinterService(baseConfig); + + const first = await minter.getToken(); + expect(first).toBe('jwt-1'); + + // Advance past expiry minus REFRESH_BUFFER (30s) minus CLOCK_SKEW (10s) — i.e., + // 60s TTL with 40s of buffer leaves a 20s window before forced refresh. + nowSpy.mockReturnValue(realNow + 25_000); + + const second = await minter.getToken(); + expect(second).toBe('jwt-2'); + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/src/runtime/runtime-jwt-minter.service.ts b/src/runtime/runtime-jwt-minter.service.ts new file mode 100644 index 0000000..a5c5ea3 --- /dev/null +++ b/src/runtime/runtime-jwt-minter.service.ts @@ -0,0 +1,118 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { AppConfigService } from '../config/app-config.service'; + +interface CacheEntry { + token: string; + expiresAt: number; +} + +interface MintResponse { + token: string; + sender: string; + expires_in_seconds?: number; +} + +/** + * Mints and caches the control-plane's runtime JWT. Mirrors the + * `AuthTokenMinterService` pattern in examples-service but is + * single-tenant — only ever mints for the `control-plane` sender with + * `is_observer: true`. + * + * The credential resolver calls `getToken()` on every gRPC call; the + * minter returns a cached token until TTL minus a clock-skew buffer, then + * re-mints in-band. A small in-flight promise dedupes concurrent + * refreshes during the brief refresh window. + */ +@Injectable() +export class RuntimeJwtMinterService { + private readonly logger = new Logger(RuntimeJwtMinterService.name); + private cache: CacheEntry | undefined; + private inflight: Promise | undefined; + + // Refresh 30s before the cached token actually expires so a slow refresh + // doesn't race ongoing requests. Auth-service signs with a separate clock, + // so we also account for ~10s of skew tolerance. + private static readonly REFRESH_BUFFER_MS = 30_000; + private static readonly CLOCK_SKEW_MS = 10_000; + + constructor(private readonly config: AppConfigService) {} + + isEnabled(): boolean { + return Boolean(this.config.authServiceUrl); + } + + async getToken(): Promise { + if (!this.isEnabled()) { + throw new Error('RuntimeJwtMinterService.getToken called but MACP_AUTH_SERVICE_URL is unset'); + } + + const now = Date.now(); + if ( + this.cache && + now < this.cache.expiresAt - RuntimeJwtMinterService.REFRESH_BUFFER_MS - RuntimeJwtMinterService.CLOCK_SKEW_MS + ) { + return this.cache.token; + } + + if (this.inflight) return this.inflight; + + this.inflight = this.refresh().finally(() => { + this.inflight = undefined; + }); + return this.inflight; + } + + private async refresh(): Promise { + const url = `${this.config.authServiceUrl.replace(/\/+$/, '')}/tokens`; + const body = { + sender: this.config.authTokenSender, + ttl_seconds: this.config.authTokenTtlSeconds, + scopes: { + // Control-plane is strictly an observer. It must not be able to start + // sessions or mutate the mode/policy registry via this credential. + can_start_sessions: false, + is_observer: true, + // allowed_modes intentionally omitted → the runtime treats it as + // "all modes allowed" for read operations. The is_observer flag is + // what authorizes `Stream`/`GetSession`/etc. + }, + }; + + let response: Response; + try { + response = await fetch(url, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify(body), + signal: AbortSignal.timeout(this.config.authServiceTimeoutMs), + }); + } catch (err) { + const reason = err instanceof Error ? err.message : 'unknown network error'; + this.logger.warn(`auth_mint_failure reason=network:${reason}`); + throw new Error(`auth-service request failed: ${reason}`); + } + + if (!response.ok) { + const text = await response.text().catch(() => ''); + this.logger.warn(`auth_mint_failure http_${response.status} body=${text.slice(0, 200)}`); + throw new Error(`auth-service returned ${response.status}`); + } + + let parsed: MintResponse; + try { + parsed = (await response.json()) as MintResponse; + } catch (err) { + const reason = err instanceof Error ? err.message : 'invalid JSON'; + throw new Error(`auth-service response parse failed: ${reason}`); + } + if (!parsed?.token) throw new Error('auth-service response missing token'); + + const ttlSec = Number.isFinite(parsed.expires_in_seconds) + ? Math.max(60, Math.floor(parsed.expires_in_seconds!)) + : this.config.authTokenTtlSeconds; + const expiresAt = Date.now() + ttlSec * 1000; + this.cache = { token: parsed.token, expiresAt }; + this.logger.log(`auth_mint_success sender=${this.config.authTokenSender} expires_in=${ttlSec}s`); + return parsed.token; + } +} diff --git a/src/runtime/rust-runtime.provider.spec.ts b/src/runtime/rust-runtime.provider.spec.ts index 3cbdaf8..2a19549 100644 --- a/src/runtime/rust-runtime.provider.spec.ts +++ b/src/runtime/rust-runtime.provider.spec.ts @@ -1,6 +1,7 @@ import { RustRuntimeProvider } from './rust-runtime.provider'; import { AppConfigService } from '../config/app-config.service'; import { RuntimeCredentialResolverService } from './runtime-credential-resolver.service'; +import { RuntimeJwtMinterService } from './runtime-jwt-minter.service'; import { InstrumentationService } from '../telemetry/instrumentation.service'; import { RawRuntimeEvent, RuntimeSubscribeSessionRequest } from '../contracts/runtime'; @@ -49,7 +50,11 @@ function makeProvider(streamFactory: () => unknown): { runtimeCircuitBreakerResetMs: 30_000 } as unknown as AppConfigService; - const resolver = new RuntimeCredentialResolverService(config); + const jwtMinter = { + isEnabled: () => false, + getToken: () => Promise.reject(new Error('jwt disabled in unit test')) + } as unknown as RuntimeJwtMinterService; + const resolver = new RuntimeCredentialResolverService(config, jwtMinter); const instrumentation = {} as InstrumentationService; const provider = new RustRuntimeProvider(config, resolver, instrumentation); diff --git a/src/webhooks/webhook.service.ts b/src/webhooks/webhook.service.ts index 107c509..0aa0c2d 100644 --- a/src/webhooks/webhook.service.ts +++ b/src/webhooks/webhook.service.ts @@ -39,18 +39,29 @@ export class WebhookService { } async fireEvent(payload: WebhookPayload): Promise { - const activeWebhooks = await this.webhookRepository.listActive(); - const matching = activeWebhooks.filter((wh) => wh.events.length === 0 || wh.events.includes(payload.event)); + // Callers fire-and-forget via `void webhookService.fireEvent(...)`. Any + // rejection here (e.g., pool closed during shutdown, transient DB error) + // would surface as an unhandled rejection and crash the process or fail + // a test suite. Swallow and log; webhook delivery has its own outbox + // semantics for durability. + try { + const activeWebhooks = await this.webhookRepository.listActive(); + const matching = activeWebhooks.filter((wh) => wh.events.length === 0 || wh.events.includes(payload.event)); - for (const webhook of matching) { - // Outbox pattern: insert delivery record first, then attempt delivery - const delivery = await this.deliveryRepository.create({ - webhookId: webhook.id, - event: payload.event, - runId: payload.runId, - payload: payload as unknown as Record - }); - void this.deliverWithTracking(delivery.id, webhook.url, webhook.secret, payload); + for (const webhook of matching) { + // Outbox pattern: insert delivery record first, then attempt delivery + const delivery = await this.deliveryRepository.create({ + webhookId: webhook.id, + event: payload.event, + runId: payload.runId, + payload: payload as unknown as Record + }); + void this.deliverWithTracking(delivery.id, webhook.url, webhook.secret, payload); + } + } catch (err) { + this.logger.warn( + `fireEvent(${payload.event}) failed: ${err instanceof Error ? err.message : String(err)}` + ); } } diff --git a/test/helpers/test-app.ts b/test/helpers/test-app.ts index 83dbe5b..c9652a5 100644 --- a/test/helpers/test-app.ts +++ b/test/helpers/test-app.ts @@ -8,6 +8,8 @@ import { GlobalExceptionFilter } from '../../src/errors/exception.filter'; import { RustRuntimeProvider } from '../../src/runtime/rust-runtime.provider'; import { RuntimeProviderRegistry } from '../../src/runtime/runtime-provider.registry'; import { StreamConsumerService } from '../../src/runs/stream-consumer.service'; +import { SessionDiscoveryService } from '../../src/runs/session-discovery.service'; +import { SignalConsumerService } from '../../src/runs/signal-consumer.service'; import { runMigrations } from '../../src/db/migrate'; import { RuntimeScript, @@ -131,35 +133,22 @@ export async function createTestApp( await app.listen(0); const url = await app.getUrl(); + // Wrap app.close() so tests that only call `ctx.app.close()` still get a + // clean drain: force-terminate active runs via a short-lived connection, + // await the three background lifecycle services, then delegate to Nest's + // close() (which runs onModuleDestroy in reverse dependency order — + // DatabaseService closes the pool last). + const originalClose = app.close.bind(app); + (app as { close: typeof app.close }).close = async () => { + await drainBackgroundWork(moduleRef, runtimeMode); + return originalClose(); + }; + const client = new TestClient(url, 'test-key-integration'); const cleanup = async () => { - // Stop all active stream consumers before truncating to prevent - // race conditions where async events reference deleted runs - const streamConsumer = moduleRef.get(StreamConsumerService); - await streamConsumer.onModuleDestroy(); - + await drainBackgroundWork(moduleRef, runtimeMode); const dbService = moduleRef.get(DatabaseService); - - // Use a dedicated short-lived connection (not the app pool, which may be - // exhausted by background executor operations) to force-terminate active runs, - // then truncate. This prevents cleanup from hanging. - const { Client } = require('pg'); - const client = new Client({ connectionString: TEST_DB_URL }); - try { - await client.connect(); - // Force all in-progress runs to 'failed' so background operations stop - await client.query( - `UPDATE runs SET status = 'failed', error_code = 'TEST_CLEANUP', error_message = 'force-terminated by test cleanup', ended_at = now() WHERE status NOT IN ('completed','failed','cancelled','queued')` - ); - // Brief pause for background operations to notice the state change - await new Promise((r) => setTimeout(r, runtimeMode === 'mock' ? 300 : 1000)); - } catch { - // Best-effort; proceed to truncate - } finally { - await client.end().catch(() => {}); - } - await truncateAll(dbService.pool); }; @@ -173,3 +162,52 @@ export async function createTestApp( runtimeMode }; } + +/** + * Drain order matters: flip runs to terminal first (so stream/signal consumers + * stop enqueuing new projection work), then explicitly destroy the three + * background lifecycle services. Each one awaits its own in-flight loops, + * including any outstanding `persistRawAndCanonical` chain entries. Only after + * this does it become safe for Nest to close the DB pool. + */ +async function drainBackgroundWork( + moduleRef: TestingModule, + runtimeMode: 'mock' | 'docker' | 'remote' +): Promise { + // Force in-progress runs to terminal via a dedicated short-lived connection + // so the stream consumer's finalize loops short-circuit. + const { Client } = require('pg'); + const term = new Client({ connectionString: TEST_DB_URL }); + try { + await term.connect(); + await term.query( + `UPDATE runs SET status = 'failed', error_code = 'TEST_CLEANUP', error_message = 'force-terminated by test cleanup', ended_at = now() WHERE status NOT IN ('completed','failed','cancelled','queued')` + ); + // Brief grace so background operations notice the state change before drain. + await new Promise((r) => setTimeout(r, runtimeMode === 'mock' ? 100 : 500)); + } catch { + // Best-effort. + } finally { + await term.end().catch(() => {}); + } + + // Await each lifecycle service's bounded drain. Order doesn't matter for + // correctness — each service waits on its own loop — but destroying them + // before Nest's own onModuleDestroy sweep guarantees the pool is alive + // while they finish any in-flight persistRawAndCanonical. + const services = [ + safeGet(moduleRef, SessionDiscoveryService), + safeGet(moduleRef, SignalConsumerService), + safeGet(moduleRef, StreamConsumerService) + ].filter((svc): svc is NonNullable => svc !== undefined); + + await Promise.allSettled(services.map((svc) => svc.onModuleDestroy())); +} + +function safeGet(moduleRef: TestingModule, token: new (...args: never[]) => T): T | undefined { + try { + return moduleRef.get(token, { strict: false }); + } catch { + return undefined; + } +}