diff --git a/drizzle/0014_projection_policy_column.sql b/drizzle/0014_projection_policy_column.sql new file mode 100644 index 0000000..fd83d65 --- /dev/null +++ b/drizzle/0014_projection_policy_column.sql @@ -0,0 +1,13 @@ +-- Add policy projection column to run_projections. +-- +-- The projection.policy block was previously dropped at persist time because +-- ProjectionRepository.upsert had no column to write it to. This caused the +-- T2C policy.quorumStatus derivation (and any other policy projection field) +-- to be invisible in the run state response. Adding a dedicated jsonb column +-- so the policy projection survives the read-modify-write cycle. +-- +-- Default mirrors the in-memory shape ProjectionService.empty() returns when +-- no events have populated the policy projection yet. + +ALTER TABLE run_projections + ADD COLUMN IF NOT EXISTS policy jsonb NOT NULL DEFAULT '{"policyVersion": "", "commitmentEvaluations": []}'::jsonb; diff --git a/src/app.module.ts b/src/app.module.ts index 7dadd13..a4944ee 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -50,6 +50,7 @@ import { RunManagerService } from './runs/run-manager.service'; import { RunRecoveryService } from './runs/run-recovery.service'; import { StreamConsumerService } from './runs/stream-consumer.service'; import { SessionDiscoveryService } from './runs/session-discovery.service'; +import { SignalConsumerService } from './runs/signal-consumer.service'; import { WebhookController } from './controllers/webhook.controller'; import { WebhookDeliveryRepository } from './webhooks/webhook-delivery.repository'; import { WebhookRepository } from './webhooks/webhook.repository'; @@ -123,6 +124,7 @@ import { WebhookService } from './webhooks/webhook.service'; RunManagerService, StreamConsumerService, SessionDiscoveryService, + SignalConsumerService, RunExecutorService, RunRecoveryService, RunInsightsService, diff --git a/src/contracts/runtime.ts b/src/contracts/runtime.ts index 476dc0e..68fa523 100644 --- a/src/contracts/runtime.ts +++ b/src/contracts/runtime.ts @@ -213,6 +213,17 @@ export interface RuntimeProvider { listSessions(): Promise; watchSessions(): AsyncIterable; + /** + * Subscribe to the runtime's ambient Signal/Progress envelopes. + * + * The runtime broadcasts Signal and Progress envelopes on a dedicated bus + * (signal_bus, separate from per-session stream_bus). Yields RawRuntimeEvent + * stream-envelope items so the same normalizer that handles per-session + * envelopes can ingest these. Caller correlates the envelope to a run via + * `envelope.sessionId`. + */ + watchSignals(): AsyncIterable; + // Governance policy lifecycle (RFC-MACP-0012) registerPolicy(req: RuntimeRegisterPolicyRequest): Promise; unregisterPolicy(req: RuntimeUnregisterPolicyRequest): Promise; diff --git a/src/db/schema.ts b/src/db/schema.ts index cea8161..84d47c7 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -150,6 +150,10 @@ export const runProjections = pgTable( .notNull() .default({ spanCount: 0, linkedArtifacts: [] }), progress: jsonb('progress').$type>().notNull().default({ entries: [] }), + policy: jsonb('policy') + .$type>() + .notNull() + .default({ policyVersion: '', commitmentEvaluations: [] }), updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() }, (table) => ({ diff --git a/src/events/event-normalizer.service.ts b/src/events/event-normalizer.service.ts index 20f8db1..7559232 100644 --- a/src/events/event-normalizer.service.ts +++ b/src/events/event-normalizer.service.ts @@ -515,12 +515,40 @@ export class EventNormalizerService implements EventNormalizer { function extractLlmCall(payload?: Record | null): Record | null { if (!payload) return null; const meta = payload.metadata as Record | undefined; + // SignalPayload carries arbitrary JSON in its `data` bytes field. When the + // signal_type is `llm.call.completed`, agents stash the full LLM call info + // there. The envelope-level proto decoder leaves `data` as a base64 string + // (or Buffer); decode it to JSON here so token extraction below can find + // tokenUsage / model / latency. + let signalData: Record | undefined; + const sigType = payload.signalType ?? payload.signal_type; + if (sigType === 'llm.call.completed' && payload.data != null) { + try { + const raw = payload.data as unknown; + let jsonStr: string | undefined; + if (typeof raw === 'string') { + // base64-encoded bytes → JSON + jsonStr = Buffer.from(raw, 'base64').toString('utf-8'); + } else if (Buffer.isBuffer(raw)) { + jsonStr = raw.toString('utf-8'); + } + if (jsonStr) { + const parsed = JSON.parse(jsonStr); + if (parsed && typeof parsed === 'object') signalData = parsed as Record; + } + } catch { + /* ignore decode failures */ + } + } const candidates: Array | undefined> = [ payload.llmCall as Record | undefined, meta?.llmCall as Record | undefined, // `tokenUsage` is the minimal form — upgrade it to an llmCall shape. payload.tokenUsage as Record | undefined, - meta?.tokenUsage as Record | undefined + meta?.tokenUsage as Record | undefined, + // Signal-carried llm.call.completed data + signalData, + signalData?.tokenUsage as Record | undefined ]; for (const c of candidates) { if (!c || typeof c !== 'object') continue; diff --git a/src/metrics/metrics.service.ts b/src/metrics/metrics.service.ts index 2cfcdc0..5007543 100644 --- a/src/metrics/metrics.service.ts +++ b/src/metrics/metrics.service.ts @@ -26,10 +26,17 @@ function extractTokenUsage(event: CanonicalEvent): { } | null { const data = event.data as Record; - // Check multiple possible locations + // Check multiple possible locations. The synthesized llm.call.completed + // event has tokens at the top level (extractLlmCall returns {promptTokens, + // completionTokens, totalTokens, ...}); other event types may carry them + // under tokenUsage, metadata.tokenUsage, etc. const candidates = [ + data, data.tokenUsage, (data.metadata as Record | undefined)?.tokenUsage, + // Synthesized llm.call.completed events carry the call shape directly + // under data.decodedPayload (extractLlmCall return value). + data.decodedPayload as Record | undefined, (data.decodedPayload as Record | undefined)?.tokenUsage, (data.payloadDescriptor as Record | undefined)?.tokenUsage ]; @@ -100,11 +107,20 @@ export class MetricsService { let totalTokens = safeNumber(current.totalTokens); let estimatedCostUsd = safeNumber(current.estimatedCostUsd); let sessionState = current.sessionState as string | undefined; + // Once the run reaches terminal state, freeze lastEventAt so post-completion + // cleanup events (e.g. session.state.changed at TTL boundary) don't inflate + // durationMs. + let runTerminal = + sessionState === 'SESSION_STATE_RESOLVED' || sessionState === 'SESSION_STATE_EXPIRED'; for (const event of events) { eventCount += 1; firstEventAt ??= event.ts; - lastEventAt = event.ts; + if (!runTerminal) lastEventAt = event.ts; + if (event.type === 'run.completed' || event.type === 'run.failed' || event.type === 'run.cancelled') { + lastEventAt = event.ts; + runTerminal = true; + } if (event.type.startsWith('message.')) messageCount += 1; if (event.type === 'signal.emitted') signalCount += 1; if (event.type.startsWith('proposal.')) proposalCount += 1; diff --git a/src/projection/projection.service.ts b/src/projection/projection.service.ts index 185571b..5b60701 100644 --- a/src/projection/projection.service.ts +++ b/src/projection/projection.service.ts @@ -16,6 +16,13 @@ export const PROJECTION_SCHEMA_VERSION = 3; @Injectable() export class ProjectionService { private readonly logger = new Logger(ProjectionService.name); + // Per-run serialization to prevent concurrent stream-consumer + signal-consumer + // updates from racing on the read-merge-write cycle. Without this, the + // optimistic version-check in projection.repository.upsert can drop a + // signal-consumer's signal when a higher-version stream-consumer write + // lands later (the second writer reads stale state, doesn't include the + // signal, and overwrites the version-7 update with version-15). + private readonly applyLocks = new Map>(); constructor(private readonly projectionRepository: ProjectionRepository) {} @@ -46,7 +53,7 @@ export class ProjectionService { accepted: 0, rejected: 0 }, - policy: ((row as unknown as Record).policy as RunStateProjection['policy']) ?? { + policy: (row.policy as unknown as RunStateProjection['policy']) ?? { policyVersion: '', commitmentEvaluations: [] }, @@ -58,15 +65,27 @@ export class ProjectionService { } async applyAndPersist(runId: string, events: CanonicalEvent[], tx?: unknown): Promise { - const current = (await this.get(runId)) ?? this.empty(runId); - const next = this.applyEvents(current, events); - const version = (events.at(-1)?.seq ?? current.timeline.latestSeq) || 0; - await this.projectionRepository.upsert( + // Chain on the per-run lock so concurrent updates serialize. + const prior = this.applyLocks.get(runId) ?? Promise.resolve(); + const next = prior.then(async () => { + const current = (await this.get(runId)) ?? this.empty(runId); + const merged = this.applyEvents(current, events); + const version = (events.at(-1)?.seq ?? current.timeline.latestSeq) || 0; + await this.projectionRepository.upsert( + runId, + merged, + version, + PROJECTION_SCHEMA_VERSION, + tx as Parameters[4] + ); + return merged; + }); + this.applyLocks.set( runId, - next, - version, - PROJECTION_SCHEMA_VERSION, - tx as Parameters[4] + next.finally(() => { + // Release if still latest; otherwise leave the chain head intact. + if (this.applyLocks.get(runId) === next) this.applyLocks.delete(runId); + }) ); return next; } @@ -247,10 +266,16 @@ export class ProjectionService { const proposalPayload = event.data.decodedPayload as Record | undefined; const messageType = String(event.data.messageType ?? ''); const sender = String(event.data.sender ?? ''); + const explicitConfidence = safeOptionalNumber(proposalPayload?.confidence); + // Vote envelopes don't carry confidence in their payload schema, but a + // Vote IS by definition a confident decision — default to 1.0 so the + // per-contributor table doesn't render "—" for every voter. + const contributionConfidence = + explicitConfidence ?? (messageType === 'Vote' ? 1.0 : undefined); const contribution: DecisionProposalContribution = { participantId: sender, action: inferContributionAction(messageType, proposalPayload), - confidence: safeOptionalNumber(proposalPayload?.confidence), + confidence: contributionConfidence, reasons: extractReasons(proposalPayload), ts: event.ts, vote: inferContributionVote(messageType, proposalPayload), @@ -260,16 +285,26 @@ export class ProjectionService { const proposalId = String( proposalPayload?.proposalId ?? proposalPayload?.requestId ?? event.subject?.id ?? '' ); + // Aggregate confidence: when proposals contain Votes, prefer the + // approve-ratio (so 3-of-3 approves = 100%; 2-of-4 = 50%). Falls + // back to the explicit payload value or any prior aggregate. + const allContributions = [...existingProposals, contribution]; + const voteContributions = allContributions.filter((p) => p.vote === 'allow' || p.vote === 'deny'); + const approveCount = voteContributions.filter((p) => p.vote === 'allow').length; + const aggregateConfidence = + voteContributions.length > 0 + ? approveCount / voteContributions.length + : (explicitConfidence ?? next.decision.current?.confidence); next.decision.current = { ...(next.decision.current ?? { finalized: false }), action: proposalId || String(event.subject?.id ?? 'proposal'), - confidence: safeOptionalNumber(proposalPayload?.confidence) ?? next.decision.current?.confidence, + confidence: aggregateConfidence, reasons: [ String(proposalPayload?.reason ?? proposalPayload?.summary ?? proposalPayload?.rationale ?? event.type) ].filter(Boolean), finalized: false, proposalId, - proposals: [...existingProposals, contribution].slice(-50) + proposals: allContributions.slice(-50) }; // Update voteTally if this is a vote-bearing contribution @@ -285,10 +320,17 @@ export class ProjectionService { const outcomePositive: boolean | null = explicitOutcome != null ? Boolean(explicitOutcome) : inferOutcomePositiveFromAction(action); const sender = (event.data.sender as string | undefined) ?? undefined; + // Final aggregate confidence: prefer explicit, otherwise compute + // from the vote tally we accumulated during proposal.updated. + const priorProposals = next.decision.current?.proposals ?? []; + const priorVotes = priorProposals.filter((p) => p.vote === 'allow' || p.vote === 'deny'); + const priorApprove = priorVotes.filter((p) => p.vote === 'allow').length; + const computedAggregate = priorVotes.length > 0 ? priorApprove / priorVotes.length : undefined; next.decision.current = { ...(next.decision.current ?? { finalized: false }), action, - confidence: safeOptionalNumber(payload?.confidence) ?? next.decision.current?.confidence, + confidence: + safeOptionalNumber(payload?.confidence) ?? computedAggregate ?? next.decision.current?.confidence, reasons: [String(payload?.reason ?? 'Commitment observed')], finalized: true, proposalId: String(payload?.commitmentId ?? next.decision.current?.proposalId ?? ''), @@ -299,6 +341,17 @@ export class ProjectionService { next.run.status = 'completed'; // Propagate outcomePositive to policy projection next.policy.outcomePositive = outcomePositive; + // Derive policy.resolved from a successful commit. The runtime does + // not emit PolicyResolved / PolicyCommitmentEvaluated envelopes + // (RFC-MACP-0012 forward-compat surface that's not yet implemented), + // but the runtime's policy evaluator HAS approved the commit by the + // time decision.finalized fires — so we can mark the policy as + // resolved here. This flips the PolicyPanel header from "pending" + // to "resolved" for committed runs. + next.policy.resolvedAt = next.policy.resolvedAt ?? event.ts; + if (next.policy.quorumStatus === undefined || next.policy.quorumStatus === 'pending') { + next.policy.quorumStatus = outcomePositive === false ? 'failed' : 'reached'; + } break; } case 'progress.reported': { @@ -488,6 +541,19 @@ export class ProjectionService { projection.participants.push(participant); projection.graph.nodes.push({ id: participantId, kind: 'participant', status: 'idle' }); } + + // If the run already reached a terminal state, do not re-activate a + // participant that's been swept to a terminal status. Late-arriving + // envelopes (e.g. Commitment events normalized after run.completed) would + // otherwise flip risk-agent back to 'active' even though the run is done. + const runTerminal = projection.run.status === 'completed' || projection.run.status === 'failed' || projection.run.status === 'cancelled'; + const participantTerminal = participant.status === 'completed' || participant.status === 'failed' || participant.status === 'skipped'; + if (runTerminal && participantTerminal) { + participant.latestActivityAt = ts; + if (summary) participant.latestSummary = summary; + return; + } + participant.status = status; participant.latestActivityAt = ts; if (summary) participant.latestSummary = summary; diff --git a/src/runs/run-manager.service.ts b/src/runs/run-manager.service.ts index 9ad830a..294f648 100644 --- a/src/runs/run-manager.service.ts +++ b/src/runs/run-manager.service.ts @@ -1,4 +1,4 @@ -import { BadRequestException, Injectable, Logger, NotFoundException } from '@nestjs/common'; +import { BadRequestException, ConflictException, Injectable, Logger, NotFoundException } from '@nestjs/common'; import { randomUUID } from 'node:crypto'; import { RunDescriptor, RunStateProjection } from '../contracts/control-plane'; import { AuditService } from '../audit/audit.service'; @@ -129,7 +129,25 @@ export class RunManagerService { session: { runtimeSessionId: string; initiator: string; ack: { sessionState: string } }, capabilities?: Record ) { - const run = await this.runRepository.markBindingSession(runId, session.runtimeSessionId); + let run: Awaited>; + try { + run = await this.runRepository.markBindingSession(runId, session.runtimeSessionId); + } catch (err) { + // Run may have already advanced past binding_session (e.g., SessionDiscovery + // racing with RunExecutor, or a re-emitted session.created event). Swallow + // the ConflictException so the caller — often an unawaited void — doesn't + // become an unhandled rejection and crash the process. Side-effects below + // are skipped because they already fired on the first successful bind. + if (err instanceof ConflictException) { + const current = await this.runRepository.findById(runId); + this.logger.warn( + `bindSession no-op for run ${runId}: ${err.message} (current status=${current?.status ?? 'missing'})` + ); + if (!current) throw new NotFoundException(`run ${runId} not found`); + return current; + } + throw err; + } await this.runtimeSessionRepository.upsert({ runId, runtimeKind: request.runtime.kind, diff --git a/src/runs/session-discovery.service.ts b/src/runs/session-discovery.service.ts index d48f001..7bab7b8 100644 --- a/src/runs/session-discovery.service.ts +++ b/src/runs/session-discovery.service.ts @@ -94,13 +94,24 @@ export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy { `Auto-discovered session ${session.sessionId} → run ${run.id} (mode=${session.mode}, initiator=${session.initiator})` ); - void this.runManager.markStarted(run.id, descriptor); - void this.runManager.bindSession(run.id, descriptor, { - runtimeSessionId: session.sessionId, - initiator: session.initiator ?? '', - ack: { sessionState: session.state } - }); - void this.runManager.markRunning(run.id, session.sessionId); + try { + await this.runManager.markStarted(run.id, descriptor); + await this.runManager.bindSession(run.id, descriptor, { + runtimeSessionId: session.sessionId, + initiator: session.initiator ?? '', + ack: { sessionState: session.state } + }); + await this.runManager.markRunning(run.id, session.sessionId); + } catch (err) { + // Keep the WatchSessions loop alive. Subscribing and consuming the stream + // below is the point of session discovery — state-machine drift for a + // single run must not abort discovery for every other session. + this.logger.warn( + `Failed to sync run state for discovered session ${session.sessionId}: ${ + err instanceof Error ? err.message : String(err) + }` + ); + } const handle = provider.subscribeSession({ runId: run.id, diff --git a/src/runs/signal-consumer.service.ts b/src/runs/signal-consumer.service.ts new file mode 100644 index 0000000..407807f --- /dev/null +++ b/src/runs/signal-consumer.service.ts @@ -0,0 +1,116 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { RuntimeProviderRegistry } from '../runtime/runtime-provider.registry'; +import { RunRepository } from '../storage/run.repository'; +import { EventNormalizerService } from '../events/event-normalizer.service'; +import { RunEventService } from '../events/run-event.service'; +import { ProtoRegistryService } from '../runtime/proto-registry.service'; +import { AppConfigService } from '../config/app-config.service'; +import type { NormalizeContext } from '../contracts/runtime'; + +/** + * Subscribes to the runtime's WatchSignals stream (separate from per-session + * StreamSession). The runtime broadcasts ambient Signal/Progress envelopes on + * a dedicated bus; this service routes each envelope to the matching run by + * `envelope.sessionId` and persists it through the same normalizer + + * RunEventService pipeline used by the per-session stream consumer. + * + * Without this service, agent-emitted Signal/Progress envelopes (including the + * `llm.call.completed` signals carrying token usage) are invisible to CP. + */ +@Injectable() +export class SignalConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(SignalConsumerService.name); + private aborted = false; + + constructor( + private readonly providerRegistry: RuntimeProviderRegistry, + private readonly runRepository: RunRepository, + private readonly normalizer: EventNormalizerService, + private readonly eventService: RunEventService, + private readonly protoRegistry: ProtoRegistryService, + private readonly config: AppConfigService + ) {} + + async onModuleInit(): Promise { + if (!this.config.sessionDiscoveryEnabled) { + this.logger.log('Signal consumer disabled (gated on SESSION_DISCOVERY_ENABLED)'); + return; + } + void this.startConsumeLoop(); + } + + onModuleDestroy(): void { + this.aborted = true; + } + + private async startConsumeLoop(): Promise { + this.logger.log('Starting WatchSignals consumer'); + + while (!this.aborted) { + try { + await this.consumeSignalStream(); + } catch (error) { + if (this.aborted) return; + const message = error instanceof Error ? error.message : String(error); + this.logger.warn(`WatchSignals stream ended: ${message}. Reconnecting in 5s...`); + await new Promise((r) => setTimeout(r, 5000)); + } + } + } + + private async consumeSignalStream(): Promise { + const provider = this.providerRegistry.get('rust'); + const stream = provider.watchSignals(); + + for await (const raw of stream) { + if (this.aborted) return; + if (raw.kind !== 'stream-envelope' || !raw.envelope) continue; + + // Signal envelopes are ambient (envelope.sessionId always empty per + // RFC-MACP-0001 §x); the session correlation lives in the decoded + // payload's `correlation_session_id`. Progress envelopes may be + // session-scoped or ambient. Try both sources, ambient first. + let sessionId = raw.envelope.sessionId ?? ''; + if (!sessionId && raw.envelope.payload) { + try { + const decoded = this.protoRegistry.decodeKnown( + 'macp.v1', + raw.envelope.messageType ?? '', + raw.envelope.payload + ) as Record | null; + if (decoded) { + sessionId = + (decoded.correlationSessionId as string | undefined) ?? + (decoded.correlation_session_id as string | undefined) ?? + ''; + } + } catch { + /* decode failed — drop silently */ + } + } + if (!sessionId) continue; + + try { + const run = await this.runRepository.findByRuntimeSessionId(sessionId); + if (!run) { + // Ambient signal for an unknown session — drop silently. + this.logger.debug(`Signal for unknown session ${sessionId}; dropping`); + continue; + } + + const ctx: NormalizeContext = { + knownParticipants: new Set(), + execution: undefined as unknown as NormalizeContext['execution'], + runtimeSessionId: sessionId + }; + + const canonical = this.normalizer.normalize(run.id, raw, ctx); + await this.eventService.persistRawAndCanonical(run.id, raw, canonical); + } catch (err) { + this.logger.warn( + `Failed to persist signal for session ${sessionId}: ${err instanceof Error ? err.message : String(err)}` + ); + } + } + } +} diff --git a/src/runtime/rust-runtime.provider.spec.ts b/src/runtime/rust-runtime.provider.spec.ts index 0e0e23a..3cbdaf8 100644 --- a/src/runtime/rust-runtime.provider.spec.ts +++ b/src/runtime/rust-runtime.provider.spec.ts @@ -80,13 +80,13 @@ describe('RustRuntimeProvider.subscribeSession — passive-subscribe frame (RFC- runtimeSessionId: 'sess-abc' }; - it('writes a single passive-subscribe frame with afterSequence=0 (default) then half-closes', async () => { + it('writes a single passive-subscribe frame with afterSequence=0 (default) and keeps the write side open', async () => { const stream = makeFakeStream(); const { provider } = makeProvider(() => stream); const handle = provider.subscribeSession(baseReq); - // Allow the launch microtask (credentials resolve + write/end) to settle. + // Allow the launch microtask (credentials resolve + write) to settle. await new Promise((r) => setImmediate(r)); expect(stream.write).toHaveBeenCalledTimes(1); @@ -94,13 +94,10 @@ describe('RustRuntimeProvider.subscribeSession — passive-subscribe frame (RFC- subscribeSessionId: 'sess-abc', afterSequence: 0 }); - expect(stream.end).toHaveBeenCalledTimes(1); - - // Verify call order: write must precede end so the runtime sees the - // subscription metadata before the half-close. - const writeOrder = stream.write.mock.invocationCallOrder[0]; - const endOrder = stream.end.mock.invocationCallOrder[0]; - expect(writeOrder).toBeLessThan(endOrder); + // Observer must not half-close: the runtime's StreamSession loop treats + // client half-close as "done with the stream" and stops forwarding + // envelopes. The bidi stream stays open for the session's lifetime. + expect(stream.end).not.toHaveBeenCalled(); handle.abort(); }); @@ -154,22 +151,6 @@ describe('RustRuntimeProvider.subscribeSession — passive-subscribe frame (RFC- expect(stream.end).not.toHaveBeenCalled(); }); - it('tolerates end() throwing after a successful subscribe-frame write', async () => { - const stream = makeFakeStream(); - stream.end.mockImplementation(() => { - throw new Error('end failed: half-closed'); - }); - const { provider } = makeProvider(() => stream); - - const handle = provider.subscribeSession(baseReq); - await new Promise((r) => setImmediate(r)); - - // The provider swallows end() failures (some gRPC impls no-op on - // half-closed streams). The write must still have happened. - expect(stream.write).toHaveBeenCalledTimes(1); - handle.abort(); - }); - it('emits a synthetic stream-status "opened" event before any data frames', async () => { const stream = makeFakeStream(); const { provider } = makeProvider(() => stream); diff --git a/src/runtime/rust-runtime.provider.ts b/src/runtime/rust-runtime.provider.ts index 37e0b34..e0e3647 100644 --- a/src/runtime/rust-runtime.provider.ts +++ b/src/runtime/rust-runtime.provider.ts @@ -251,8 +251,14 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { // RFC-MACP-0006 §3.2: write a passive-subscribe frame so the runtime binds // this stream to the session's broadcast channel and replays accepted - // history from `afterSequence` onwards. Then end the write side — we - // only read from here on. + // history from `afterSequence` onwards. + // + // We deliberately do NOT half-close the write side here. The runtime's + // StreamSession loop treats client half-close as "client is done with + // the stream entirely" and breaks after draining queued envelopes — + // dropping every envelope broadcast after the half-close. Keeping the + // bidi stream open lets the runtime continue forwarding live envelopes + // (Vote, Commitment, etc.) for the session's full lifetime. try { grpcCall.write({ subscribeSessionId: req.runtimeSessionId, @@ -264,11 +270,6 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { notify(); return; } - try { - grpcCall.end(); - } catch { - /* some gRPC impls no-op on half-closed streams */ - } } catch (error) { streamFailure = error instanceof Error ? error : new Error(String(error)); ended = true; @@ -505,6 +506,95 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { }; } + /** + * Subscribe to the runtime's WatchSignals stream. Mirrors the watchSessions + * pattern: long-lived async iterable that yields a RawRuntimeEvent per Signal + * or Progress envelope, with auto-cancel on consumer return(). + */ + watchSignals(): AsyncIterable { + const credentialResolver = this.credentialResolver; + const client = this.client; + const kind = this.kind; + + return { + [Symbol.asyncIterator]() { + let grpcCall: any = null; + const buffer: RawRuntimeEvent[] = []; + let resolveWait: (() => void) | null = null; + let ended = false; + let streamError: Error | null = null; + + const notify = () => { + if (resolveWait) { + const r = resolveWait; + resolveWait = null; + r(); + } + }; + + const launch = async () => { + try { + const creds = await credentialResolver.resolve({ runtimeKind: kind }); + const metadata = buildMetadata(creds.metadata); + const method = getClientMethod(client, 'WatchSignals'); + grpcCall = method.call(client, {}, metadata); + + grpcCall.on('data', (chunk: any) => { + const receivedAt = new Date().toISOString(); + const rawEnvelope = chunk.envelope ?? chunk.signal ?? chunk; + if (!rawEnvelope || (!rawEnvelope.messageType && !rawEnvelope.message_type)) return; + const envelope = fromEnvelope(rawEnvelope); + buffer.push({ kind: 'stream-envelope', receivedAt, envelope }); + notify(); + }); + + grpcCall.on('error', (err: Error) => { + streamError = err; + ended = true; + notify(); + }); + grpcCall.on('end', () => { + ended = true; + notify(); + }); + } catch (err) { + streamError = err instanceof Error ? err : new Error(String(err)); + ended = true; + notify(); + } + }; + + void launch(); + + return { + async next(): Promise> { + while (true) { + if (buffer.length > 0) return { done: false, value: buffer.shift()! }; + if (ended) { + if (streamError) throw streamError; + return { done: true, value: undefined }; + } + await new Promise((r) => { + if (buffer.length > 0 || ended) r(); + else resolveWait = r; + }); + } + }, + async return(): Promise> { + if (grpcCall) { + try { + grpcCall.cancel(); + } catch { + /* ignore */ + } + } + return { done: true, value: undefined }; + } + }; + } + }; + } + // ── Governance policy lifecycle (RFC-MACP-0012) ────────────────── async registerPolicy(req: RuntimeRegisterPolicyRequest): Promise { diff --git a/src/storage/projection.repository.ts b/src/storage/projection.repository.ts index c7ed805..def27ad 100644 --- a/src/storage/projection.repository.ts +++ b/src/storage/projection.repository.ts @@ -29,6 +29,7 @@ export class ProjectionRepository { timeline: projection.timeline as unknown as Record, traceSummary: projection.trace as unknown as Record, progress: projection.progress as unknown as Record, + policy: projection.policy as unknown as Record, updatedAt: new Date().toISOString() };