Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions drizzle/0014_projection_policy_column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Add policy projection column to run_projections.
--
-- The projection.policy block was previously dropped at persist time because
-- ProjectionRepository.upsert had no column to write it to. This caused the
-- T2C policy.quorumStatus derivation (and any other policy projection field)
-- to be invisible in the run state response. Adding a dedicated jsonb column
-- so the policy projection survives the read-modify-write cycle.
--
-- Default mirrors the in-memory shape ProjectionService.empty() returns when
-- no events have populated the policy projection yet.

ALTER TABLE run_projections
ADD COLUMN IF NOT EXISTS policy jsonb NOT NULL DEFAULT '{"policyVersion": "", "commitmentEvaluations": []}'::jsonb;
2 changes: 2 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import { RunManagerService } from './runs/run-manager.service';
import { RunRecoveryService } from './runs/run-recovery.service';
import { StreamConsumerService } from './runs/stream-consumer.service';
import { SessionDiscoveryService } from './runs/session-discovery.service';
import { SignalConsumerService } from './runs/signal-consumer.service';
import { WebhookController } from './controllers/webhook.controller';
import { WebhookDeliveryRepository } from './webhooks/webhook-delivery.repository';
import { WebhookRepository } from './webhooks/webhook.repository';
Expand Down Expand Up @@ -123,6 +124,7 @@ import { WebhookService } from './webhooks/webhook.service';
RunManagerService,
StreamConsumerService,
SessionDiscoveryService,
SignalConsumerService,
RunExecutorService,
RunRecoveryService,
RunInsightsService,
Expand Down
11 changes: 11 additions & 0 deletions src/contracts/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ export interface RuntimeProvider {
listSessions(): Promise<RuntimeSessionSnapshot[]>;
watchSessions(): AsyncIterable<SessionLifecycleEvent>;

/**
* Subscribe to the runtime's ambient Signal/Progress envelopes.
*
* The runtime broadcasts Signal and Progress envelopes on a dedicated bus
* (signal_bus, separate from per-session stream_bus). Yields RawRuntimeEvent
* stream-envelope items so the same normalizer that handles per-session
* envelopes can ingest these. Caller correlates the envelope to a run via
* `envelope.sessionId`.
*/
watchSignals(): AsyncIterable<RawRuntimeEvent>;

// Governance policy lifecycle (RFC-MACP-0012)
registerPolicy(req: RuntimeRegisterPolicyRequest): Promise<RuntimeRegisterPolicyResult>;
unregisterPolicy(req: RuntimeUnregisterPolicyRequest): Promise<RuntimeUnregisterPolicyResult>;
Expand Down
4 changes: 4 additions & 0 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ export const runProjections = pgTable(
.notNull()
.default({ spanCount: 0, linkedArtifacts: [] }),
progress: jsonb('progress').$type<Record<string, unknown>>().notNull().default({ entries: [] }),
policy: jsonb('policy')
.$type<Record<string, unknown>>()
.notNull()
.default({ policyVersion: '', commitmentEvaluations: [] }),
updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow()
},
(table) => ({
Expand Down
30 changes: 29 additions & 1 deletion src/events/event-normalizer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,40 @@ export class EventNormalizerService implements EventNormalizer {
function extractLlmCall(payload?: Record<string, unknown> | null): Record<string, unknown> | null {
if (!payload) return null;
const meta = payload.metadata as Record<string, unknown> | undefined;
// SignalPayload carries arbitrary JSON in its `data` bytes field. When the
// signal_type is `llm.call.completed`, agents stash the full LLM call info
// there. The envelope-level proto decoder leaves `data` as a base64 string
// (or Buffer); decode it to JSON here so token extraction below can find
// tokenUsage / model / latency.
let signalData: Record<string, unknown> | undefined;
const sigType = payload.signalType ?? payload.signal_type;
if (sigType === 'llm.call.completed' && payload.data != null) {
try {
const raw = payload.data as unknown;
let jsonStr: string | undefined;
if (typeof raw === 'string') {
// base64-encoded bytes → JSON
jsonStr = Buffer.from(raw, 'base64').toString('utf-8');
} else if (Buffer.isBuffer(raw)) {
jsonStr = raw.toString('utf-8');
}
if (jsonStr) {
const parsed = JSON.parse(jsonStr);
if (parsed && typeof parsed === 'object') signalData = parsed as Record<string, unknown>;
}
} catch {
/* ignore decode failures */
}
}
const candidates: Array<Record<string, unknown> | undefined> = [
payload.llmCall as Record<string, unknown> | undefined,
meta?.llmCall as Record<string, unknown> | undefined,
// `tokenUsage` is the minimal form — upgrade it to an llmCall shape.
payload.tokenUsage as Record<string, unknown> | undefined,
meta?.tokenUsage as Record<string, unknown> | undefined
meta?.tokenUsage as Record<string, unknown> | undefined,
// Signal-carried llm.call.completed data
signalData,
signalData?.tokenUsage as Record<string, unknown> | undefined
];
for (const c of candidates) {
if (!c || typeof c !== 'object') continue;
Expand Down
20 changes: 18 additions & 2 deletions src/metrics/metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ function extractTokenUsage(event: CanonicalEvent): {
} | null {
const data = event.data as Record<string, unknown>;

// Check multiple possible locations
// Check multiple possible locations. The synthesized llm.call.completed
// event has tokens at the top level (extractLlmCall returns {promptTokens,
// completionTokens, totalTokens, ...}); other event types may carry them
// under tokenUsage, metadata.tokenUsage, etc.
const candidates = [
data,
data.tokenUsage,
(data.metadata as Record<string, unknown> | undefined)?.tokenUsage,
// Synthesized llm.call.completed events carry the call shape directly
// under data.decodedPayload (extractLlmCall return value).
data.decodedPayload as Record<string, unknown> | undefined,
(data.decodedPayload as Record<string, unknown> | undefined)?.tokenUsage,
(data.payloadDescriptor as Record<string, unknown> | undefined)?.tokenUsage
];
Expand Down Expand Up @@ -100,11 +107,20 @@ export class MetricsService {
let totalTokens = safeNumber(current.totalTokens);
let estimatedCostUsd = safeNumber(current.estimatedCostUsd);
let sessionState = current.sessionState as string | undefined;
// Once the run reaches terminal state, freeze lastEventAt so post-completion
// cleanup events (e.g. session.state.changed at TTL boundary) don't inflate
// durationMs.
let runTerminal =
sessionState === 'SESSION_STATE_RESOLVED' || sessionState === 'SESSION_STATE_EXPIRED';

for (const event of events) {
eventCount += 1;
firstEventAt ??= event.ts;
lastEventAt = event.ts;
if (!runTerminal) lastEventAt = event.ts;
if (event.type === 'run.completed' || event.type === 'run.failed' || event.type === 'run.cancelled') {
lastEventAt = event.ts;
runTerminal = true;
}
if (event.type.startsWith('message.')) messageCount += 1;
if (event.type === 'signal.emitted') signalCount += 1;
if (event.type.startsWith('proposal.')) proposalCount += 1;
Expand Down
92 changes: 79 additions & 13 deletions src/projection/projection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ export const PROJECTION_SCHEMA_VERSION = 3;
@Injectable()
export class ProjectionService {
private readonly logger = new Logger(ProjectionService.name);
// Per-run serialization to prevent concurrent stream-consumer + signal-consumer
// updates from racing on the read-merge-write cycle. Without this, the
// optimistic version-check in projection.repository.upsert can drop a
// signal-consumer's signal when a higher-version stream-consumer write
// lands later (the second writer reads stale state, doesn't include the
// signal, and overwrites the version-7 update with version-15).
private readonly applyLocks = new Map<string, Promise<unknown>>();

constructor(private readonly projectionRepository: ProjectionRepository) {}

Expand Down Expand Up @@ -46,7 +53,7 @@ export class ProjectionService {
accepted: 0,
rejected: 0
},
policy: ((row as unknown as Record<string, unknown>).policy as RunStateProjection['policy']) ?? {
policy: (row.policy as unknown as RunStateProjection['policy']) ?? {
policyVersion: '',
commitmentEvaluations: []
},
Expand All @@ -58,15 +65,27 @@ export class ProjectionService {
}

async applyAndPersist(runId: string, events: CanonicalEvent[], tx?: unknown): Promise<RunStateProjection> {
const current = (await this.get(runId)) ?? this.empty(runId);
const next = this.applyEvents(current, events);
const version = (events.at(-1)?.seq ?? current.timeline.latestSeq) || 0;
await this.projectionRepository.upsert(
// Chain on the per-run lock so concurrent updates serialize.
const prior = this.applyLocks.get(runId) ?? Promise.resolve();
const next = prior.then(async () => {
const current = (await this.get(runId)) ?? this.empty(runId);
const merged = this.applyEvents(current, events);
const version = (events.at(-1)?.seq ?? current.timeline.latestSeq) || 0;
await this.projectionRepository.upsert(
runId,
merged,
version,
PROJECTION_SCHEMA_VERSION,
tx as Parameters<typeof this.projectionRepository.upsert>[4]
);
return merged;
});
this.applyLocks.set(
runId,
next,
version,
PROJECTION_SCHEMA_VERSION,
tx as Parameters<typeof this.projectionRepository.upsert>[4]
next.finally(() => {
// Release if still latest; otherwise leave the chain head intact.
if (this.applyLocks.get(runId) === next) this.applyLocks.delete(runId);
})
);
return next;
}
Expand Down Expand Up @@ -247,10 +266,16 @@ export class ProjectionService {
const proposalPayload = event.data.decodedPayload as Record<string, unknown> | undefined;
const messageType = String(event.data.messageType ?? '');
const sender = String(event.data.sender ?? '');
const explicitConfidence = safeOptionalNumber(proposalPayload?.confidence);
// Vote envelopes don't carry confidence in their payload schema, but a
// Vote IS by definition a confident decision — default to 1.0 so the
// per-contributor table doesn't render "—" for every voter.
const contributionConfidence =
explicitConfidence ?? (messageType === 'Vote' ? 1.0 : undefined);
const contribution: DecisionProposalContribution = {
participantId: sender,
action: inferContributionAction(messageType, proposalPayload),
confidence: safeOptionalNumber(proposalPayload?.confidence),
confidence: contributionConfidence,
reasons: extractReasons(proposalPayload),
ts: event.ts,
vote: inferContributionVote(messageType, proposalPayload),
Expand All @@ -260,16 +285,26 @@ export class ProjectionService {
const proposalId = String(
proposalPayload?.proposalId ?? proposalPayload?.requestId ?? event.subject?.id ?? ''
);
// Aggregate confidence: when proposals contain Votes, prefer the
// approve-ratio (so 3-of-3 approves = 100%; 2-of-4 = 50%). Falls
// back to the explicit payload value or any prior aggregate.
const allContributions = [...existingProposals, contribution];
const voteContributions = allContributions.filter((p) => p.vote === 'allow' || p.vote === 'deny');
const approveCount = voteContributions.filter((p) => p.vote === 'allow').length;
const aggregateConfidence =
voteContributions.length > 0
? approveCount / voteContributions.length
: (explicitConfidence ?? next.decision.current?.confidence);
next.decision.current = {
...(next.decision.current ?? { finalized: false }),
action: proposalId || String(event.subject?.id ?? 'proposal'),
confidence: safeOptionalNumber(proposalPayload?.confidence) ?? next.decision.current?.confidence,
confidence: aggregateConfidence,
reasons: [
String(proposalPayload?.reason ?? proposalPayload?.summary ?? proposalPayload?.rationale ?? event.type)
].filter(Boolean),
finalized: false,
proposalId,
proposals: [...existingProposals, contribution].slice(-50)
proposals: allContributions.slice(-50)
};

// Update voteTally if this is a vote-bearing contribution
Expand All @@ -285,10 +320,17 @@ export class ProjectionService {
const outcomePositive: boolean | null =
explicitOutcome != null ? Boolean(explicitOutcome) : inferOutcomePositiveFromAction(action);
const sender = (event.data.sender as string | undefined) ?? undefined;
// Final aggregate confidence: prefer explicit, otherwise compute
// from the vote tally we accumulated during proposal.updated.
const priorProposals = next.decision.current?.proposals ?? [];
const priorVotes = priorProposals.filter((p) => p.vote === 'allow' || p.vote === 'deny');
const priorApprove = priorVotes.filter((p) => p.vote === 'allow').length;
const computedAggregate = priorVotes.length > 0 ? priorApprove / priorVotes.length : undefined;
next.decision.current = {
...(next.decision.current ?? { finalized: false }),
action,
confidence: safeOptionalNumber(payload?.confidence) ?? next.decision.current?.confidence,
confidence:
safeOptionalNumber(payload?.confidence) ?? computedAggregate ?? next.decision.current?.confidence,
reasons: [String(payload?.reason ?? 'Commitment observed')],
finalized: true,
proposalId: String(payload?.commitmentId ?? next.decision.current?.proposalId ?? ''),
Expand All @@ -299,6 +341,17 @@ export class ProjectionService {
next.run.status = 'completed';
// Propagate outcomePositive to policy projection
next.policy.outcomePositive = outcomePositive;
// Derive policy.resolved from a successful commit. The runtime does
// not emit PolicyResolved / PolicyCommitmentEvaluated envelopes
// (RFC-MACP-0012 forward-compat surface that's not yet implemented),
// but the runtime's policy evaluator HAS approved the commit by the
// time decision.finalized fires — so we can mark the policy as
// resolved here. This flips the PolicyPanel header from "pending"
// to "resolved" for committed runs.
next.policy.resolvedAt = next.policy.resolvedAt ?? event.ts;
if (next.policy.quorumStatus === undefined || next.policy.quorumStatus === 'pending') {
next.policy.quorumStatus = outcomePositive === false ? 'failed' : 'reached';
}
break;
}
case 'progress.reported': {
Expand Down Expand Up @@ -488,6 +541,19 @@ export class ProjectionService {
projection.participants.push(participant);
projection.graph.nodes.push({ id: participantId, kind: 'participant', status: 'idle' });
}

// If the run already reached a terminal state, do not re-activate a
// participant that's been swept to a terminal status. Late-arriving
// envelopes (e.g. Commitment events normalized after run.completed) would
// otherwise flip risk-agent back to 'active' even though the run is done.
const runTerminal = projection.run.status === 'completed' || projection.run.status === 'failed' || projection.run.status === 'cancelled';
const participantTerminal = participant.status === 'completed' || participant.status === 'failed' || participant.status === 'skipped';
if (runTerminal && participantTerminal) {
participant.latestActivityAt = ts;
if (summary) participant.latestSummary = summary;
return;
}

participant.status = status;
participant.latestActivityAt = ts;
if (summary) participant.latestSummary = summary;
Expand Down
22 changes: 20 additions & 2 deletions src/runs/run-manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BadRequestException, Injectable, Logger, NotFoundException } from '@nestjs/common';
import { BadRequestException, ConflictException, Injectable, Logger, NotFoundException } from '@nestjs/common';
import { randomUUID } from 'node:crypto';
import { RunDescriptor, RunStateProjection } from '../contracts/control-plane';
import { AuditService } from '../audit/audit.service';
Expand Down Expand Up @@ -129,7 +129,25 @@ export class RunManagerService {
session: { runtimeSessionId: string; initiator: string; ack: { sessionState: string } },
capabilities?: Record<string, unknown>
) {
const run = await this.runRepository.markBindingSession(runId, session.runtimeSessionId);
let run: Awaited<ReturnType<RunRepository['markBindingSession']>>;
try {
run = await this.runRepository.markBindingSession(runId, session.runtimeSessionId);
} catch (err) {
// Run may have already advanced past binding_session (e.g., SessionDiscovery
// racing with RunExecutor, or a re-emitted session.created event). Swallow
// the ConflictException so the caller — often an unawaited void — doesn't
// become an unhandled rejection and crash the process. Side-effects below
// are skipped because they already fired on the first successful bind.
if (err instanceof ConflictException) {
const current = await this.runRepository.findById(runId);
this.logger.warn(
`bindSession no-op for run ${runId}: ${err.message} (current status=${current?.status ?? 'missing'})`
);
if (!current) throw new NotFoundException(`run ${runId} not found`);
return current;
}
throw err;
}
await this.runtimeSessionRepository.upsert({
runId,
runtimeKind: request.runtime.kind,
Expand Down
25 changes: 18 additions & 7 deletions src/runs/session-discovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,24 @@ export class SessionDiscoveryService implements OnModuleInit, OnModuleDestroy {
`Auto-discovered session ${session.sessionId} → run ${run.id} (mode=${session.mode}, initiator=${session.initiator})`
);

void this.runManager.markStarted(run.id, descriptor);
void this.runManager.bindSession(run.id, descriptor, {
runtimeSessionId: session.sessionId,
initiator: session.initiator ?? '',
ack: { sessionState: session.state }
});
void this.runManager.markRunning(run.id, session.sessionId);
try {
await this.runManager.markStarted(run.id, descriptor);
await this.runManager.bindSession(run.id, descriptor, {
runtimeSessionId: session.sessionId,
initiator: session.initiator ?? '',
ack: { sessionState: session.state }
});
await this.runManager.markRunning(run.id, session.sessionId);
} catch (err) {
// Keep the WatchSessions loop alive. Subscribing and consuming the stream
// below is the point of session discovery — state-machine drift for a
// single run must not abort discovery for every other session.
this.logger.warn(
`Failed to sync run state for discovered session ${session.sessionId}: ${
err instanceof Error ? err.message : String(err)
}`
);
}

const handle = provider.subscribeSession({
runId: run.id,
Expand Down
Loading
Loading