From 43dde20af2469e45758560ea88d5592c0d1019fa Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Sun, 22 Mar 2026 20:58:09 -0700 Subject: [PATCH] Align control plane with MACP spec d31ffb1 and runtime 168ab31 Remove SessionWatch/passive-attach reconnect path (spec forbids it), replace with getSession() polling fallback. Fix sender auth mismatch by setting empty sender on openSession() envelope. Add INVALID_SESSION_ID error code mapping (HTTP 400), instructions field from InitializeResponse, listChanged capability declaration, Contribute message type for ext.multi_round.v1 extension mode, and JSON decode fallback in proto registry for extension modes without proto definitions. --- src/config/app-config.service.ts | 2 + src/contracts/runtime.ts | 1 + src/errors/error-codes.ts | 3 +- src/events/event-normalizer.service.ts | 3 +- src/runs/run-executor.service.ts | 4 + src/runs/run-recovery.service.spec.ts | 3 +- src/runs/run-recovery.service.ts | 3 +- src/runs/stream-consumer.service.spec.ts | 49 ++-------- src/runs/stream-consumer.service.ts | 116 +++++++---------------- src/runtime/proto-registry.service.ts | 8 ++ src/runtime/rust-runtime.provider.ts | 109 +++++---------------- 11 files changed, 91 insertions(+), 210 deletions(-) diff --git a/src/config/app-config.service.ts b/src/config/app-config.service.ts index 41f1aca..cc70421 100644 --- a/src/config/app-config.service.ts +++ b/src/config/app-config.service.ts @@ -53,8 +53,10 @@ export class AppConfigService implements OnModuleInit { readonly runtimeUseDevHeader = readBoolean('RUNTIME_USE_DEV_HEADER', process.env.NODE_ENV === 'development'); readonly runtimeRequestTimeoutMs = readNumber('RUNTIME_REQUEST_TIMEOUT_MS', 30000); readonly runtimeDevAgentId = process.env.RUNTIME_DEV_AGENT_ID ?? 'control-plane'; + /** @deprecated SessionWatch is no longer part of the base protocol. Kept for backward compat. */ readonly runtimeStreamSubscriptionMessageType = process.env.RUNTIME_STREAM_SUBSCRIPTION_MESSAGE_TYPE ?? 'SessionWatch'; + /** @deprecated SessionWatch is no longer part of the base protocol. Kept for backward compat. */ readonly runtimeStreamSubscriberId = process.env.RUNTIME_STREAM_SUBSCRIBER_ID ?? this.runtimeDevAgentId; diff --git a/src/contracts/runtime.ts b/src/contracts/runtime.ts index b407ebf..6fbff5b 100644 --- a/src/contracts/runtime.ts +++ b/src/contracts/runtime.ts @@ -65,6 +65,7 @@ export interface RuntimeInitializeResult { }; supportedModes: string[]; capabilities?: RuntimeCapabilities; + instructions?: string; } export interface RuntimeStartSessionRequest { diff --git a/src/errors/error-codes.ts b/src/errors/error-codes.ts index fdd25f8..984f6eb 100644 --- a/src/errors/error-codes.ts +++ b/src/errors/error-codes.ts @@ -11,5 +11,6 @@ export enum ErrorCode { MODE_NOT_SUPPORTED = 'MODE_NOT_SUPPORTED', CIRCUIT_BREAKER_OPEN = 'CIRCUIT_BREAKER_OPEN', SIGNAL_DISPATCH_FAILED = 'SIGNAL_DISPATCH_FAILED', - CONTEXT_UPDATE_FAILED = 'CONTEXT_UPDATE_FAILED' + CONTEXT_UPDATE_FAILED = 'CONTEXT_UPDATE_FAILED', + INVALID_SESSION_ID = 'INVALID_SESSION_ID' } diff --git a/src/events/event-normalizer.service.ts b/src/events/event-normalizer.service.ts index 3b86a60..a33ee31 100644 --- a/src/events/event-normalizer.service.ts +++ b/src/events/event-normalizer.service.ts @@ -190,7 +190,8 @@ export class EventNormalizerService implements EventNormalizer { 'TaskFail', 'HandoffContext', 'HandoffAccept', - 'HandoffDecline' + 'HandoffDecline', + 'Contribute' ].includes(messageType) ) { return 'proposal.updated'; diff --git a/src/runs/run-executor.service.ts b/src/runs/run-executor.service.ts index 92763d2..984f574 100644 --- a/src/runs/run-executor.service.ts +++ b/src/runs/run-executor.service.ts @@ -267,6 +267,10 @@ export class RunExecutorService { } ); + if (initResult.instructions) { + this.logger.log(`runtime instructions: ${initResult.instructions}`); + } + if ( initResult.supportedModes.length > 0 && !initResult.supportedModes.includes(request.session.modeName) diff --git a/src/runs/run-recovery.service.spec.ts b/src/runs/run-recovery.service.spec.ts index 24391df..a3ba7d2 100644 --- a/src/runs/run-recovery.service.spec.ts +++ b/src/runs/run-recovery.service.spec.ts @@ -106,7 +106,8 @@ describe('RunRecoveryService', () => { runId: 'run-1', runtimeSessionId: 'sess-1', subscriberId: 'agent-1', - resumeFromSeq: 42 + resumeFromSeq: 42, + pollOnly: true }) ); }); diff --git a/src/runs/run-recovery.service.ts b/src/runs/run-recovery.service.ts index c0eef5c..60d46fe 100644 --- a/src/runs/run-recovery.service.ts +++ b/src/runs/run-recovery.service.ts @@ -123,7 +123,8 @@ export class RunRecoveryService implements OnApplicationBootstrap { runtimeKind: run.runtimeKind, runtimeSessionId, subscriberId, - resumeFromSeq + resumeFromSeq, + pollOnly: true }); this.logger.log(`recovered run ${run.id} from seq ${run.lastEventSeq}`); diff --git a/src/runs/stream-consumer.service.spec.ts b/src/runs/stream-consumer.service.spec.ts index f9243b5..d717220 100644 --- a/src/runs/stream-consumer.service.spec.ts +++ b/src/runs/stream-consumer.service.spec.ts @@ -119,18 +119,8 @@ describe('StreamConsumerService', () => { describe('start()', () => { it('should be idempotent — second call returns immediately without starting a new loop', async () => { - // Create an async iterable that never resolves so the loop stays active - const neverEndingIterable: AsyncIterable = { - [Symbol.asyncIterator]() { - return { - next: () => new Promise(() => {}), // never resolves - }; - }, - }; - const mockProvider = { - streamSession: jest.fn().mockReturnValue(neverEndingIterable), - getSession: jest.fn(), + getSession: jest.fn().mockReturnValue(new Promise(() => {})), // never resolves, keeps loop active }; runtimeRegistry.get.mockReturnValue(mockProvider as any); @@ -156,24 +146,15 @@ describe('StreamConsumerService', () => { // Second call should return immediately since 'run-1' is already active await service.start(params); - // streamSession should have been called only once (from the first start call) - expect(mockProvider.streamSession).toHaveBeenCalledTimes(1); + // getSession should have been called only once (from the first start call's poll loop) + expect(mockProvider.getSession).toHaveBeenCalledTimes(1); }); }); describe('stop()', () => { it('should set the aborted flag on the active stream marker', async () => { - const neverEndingIterable: AsyncIterable = { - [Symbol.asyncIterator]() { - return { - next: () => new Promise(() => {}), - }; - }, - }; - const mockProvider = { - streamSession: jest.fn().mockReturnValue(neverEndingIterable), - getSession: jest.fn(), + getSession: jest.fn().mockReturnValue(new Promise(() => {})), }; runtimeRegistry.get.mockReturnValue(mockProvider as any); @@ -214,17 +195,8 @@ describe('StreamConsumerService', () => { describe('onModuleDestroy()', () => { it('should abort all active streams', async () => { - const neverEndingIterable: AsyncIterable = { - [Symbol.asyncIterator]() { - return { - next: () => new Promise(() => {}), - }; - }, - }; - const mockProvider = { - streamSession: jest.fn().mockReturnValue(neverEndingIterable), - getSession: jest.fn(), + getSession: jest.fn().mockReturnValue(new Promise(() => {})), }; runtimeRegistry.get.mockReturnValue(mockProvider as any); @@ -269,17 +241,8 @@ describe('StreamConsumerService', () => { }); it('should return false when a stream is active but not connected', async () => { - const neverEndingIterable: AsyncIterable = { - [Symbol.asyncIterator]() { - return { - next: () => new Promise(() => {}), - }; - }, - }; - const mockProvider = { - streamSession: jest.fn().mockReturnValue(neverEndingIterable), - getSession: jest.fn(), + getSession: jest.fn().mockReturnValue(new Promise(() => {})), }; runtimeRegistry.get.mockReturnValue(mockProvider as any); diff --git a/src/runs/stream-consumer.service.ts b/src/runs/stream-consumer.service.ts index 97b6827..9ed73f8 100644 --- a/src/runs/stream-consumer.service.ts +++ b/src/runs/stream-consumer.service.ts @@ -47,6 +47,7 @@ export class StreamConsumerService implements OnModuleDestroy { subscriberId: string; resumeFromSeq?: number; sessionHandle?: RuntimeSessionHandle; + pollOnly?: boolean; }): Promise { if (this.active.has(params.runId)) return; const marker: ActiveStream = { @@ -116,6 +117,7 @@ export class StreamConsumerService implements OnModuleDestroy { runtimeSessionId: string; subscriberId: string; sessionHandle?: RuntimeSessionHandle; + pollOnly?: boolean; } ): Promise { const provider = this.runtimeRegistry.get(params.runtimeKind); @@ -125,31 +127,29 @@ export class StreamConsumerService implements OnModuleDestroy { runtimeSessionId: params.runtimeSessionId }; - let retries = 0; const maxRetries = this.config.streamMaxRetries; - let isFirstIteration = true; - while (!marker.aborted) { + // If we have a session handle and not poll-only, consume the stream first + if (params.sessionHandle && !params.pollOnly) { try { - // First iteration: use the session handle's events if provided - // Subsequent iterations (reconnection): fall back to streamSession() - const iterable = (isFirstIteration && params.sessionHandle) - ? params.sessionHandle.events - : provider.streamSession({ - runId: params.runId, - runtimeSessionId: params.runtimeSessionId, - modeName: params.execution.session.modeName, - subscriberId: params.subscriberId - }); - isFirstIteration = false; - - for await (const raw of this.withIdleTimeout(iterable, this.config.streamIdleTimeoutMs)) { + for await (const raw of this.withIdleTimeout(params.sessionHandle.events, this.config.streamIdleTimeoutMs)) { if (marker.aborted) return; await this.handleRawEvent(params.runId, raw, context, params.runtimeSessionId, marker); if (marker.finalized) return; - retries = 0; } + } catch (error) { + marker.connected = false; + this.logger.warn(`stream error for run ${params.runId}: ${error instanceof Error ? error.message : String(error)}`); + } + + // Stream ended — check if already finalized + if (marker.finalized || marker.aborted) return; + } + // Polling fallback: poll getSession() until terminal state or max retries + let retries = 0; + while (!marker.aborted && !marker.finalized) { + try { const snapshot = await provider.getSession({ runId: params.runId, runtimeSessionId: params.runtimeSessionId, @@ -172,72 +172,28 @@ export class StreamConsumerService implements OnModuleDestroy { await this.finalizeRun(params.runId, marker, 'failed', new Error('runtime session expired')); return; } + } catch (pollError) { + this.logger.warn( + `getSession poll failed for run ${params.runId}: ${pollError instanceof Error ? pollError.message : String(pollError)}` + ); + } - retries += 1; - if (retries > maxRetries) { - await this.finalizeRun(params.runId, marker, 'failed', new Error('stream ended without terminal session state')); - return; - } - - await this.eventService.emitControlPlaneEvents(params.runId, [ - { - ts: new Date().toISOString(), - type: 'session.stream.opened', - source: { kind: 'control-plane', name: 'stream-consumer' }, - subject: { kind: 'session', id: params.runtimeSessionId }, - data: { status: 'reconnecting', detail: 'stream ended before terminal state; retrying' } - } - ]); - await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries))); - } catch (error) { - marker.connected = false; - retries += 1; - this.logger.warn(`stream error for run ${params.runId}: ${error instanceof Error ? error.message : String(error)}`); - await this.eventService.emitControlPlaneEvents(params.runId, [ - { - ts: new Date().toISOString(), - type: 'session.stream.opened', - source: { kind: 'control-plane', name: 'stream-consumer' }, - subject: { kind: 'session', id: params.runtimeSessionId }, - data: { status: 'reconnecting', detail: error instanceof Error ? error.message : String(error) } - } - ]); - - if (retries > maxRetries) { - await this.finalizeRun(params.runId, marker, 'failed', error); - return; - } + retries += 1; + if (retries > maxRetries) { + await this.finalizeRun(params.runId, marker, 'failed', new Error('polling exhausted without terminal session state')); + return; + } - try { - const snapshot = await provider.getSession({ - runId: params.runId, - runtimeSessionId: params.runtimeSessionId, - requesterId: params.subscriberId - }); - await this.handleRawEvent( - params.runId, - { kind: 'session-snapshot', receivedAt: new Date().toISOString(), sessionSnapshot: snapshot }, - context, - params.runtimeSessionId, - marker - ); - if (marker.finalized) return; - if (snapshot.state === 'SESSION_STATE_RESOLVED') { - await this.finalizeRun(params.runId, marker, 'completed'); - return; - } - if (snapshot.state === 'SESSION_STATE_EXPIRED') { - await this.finalizeRun(params.runId, marker, 'failed', new Error('runtime session expired')); - return; - } - } catch (snapshotError) { - this.logger.warn( - `reconciliation failed for run ${params.runId}: ${snapshotError instanceof Error ? snapshotError.message : String(snapshotError)}` - ); + await this.eventService.emitControlPlaneEvents(params.runId, [ + { + ts: new Date().toISOString(), + type: 'session.stream.opened', + source: { kind: 'control-plane', name: 'stream-consumer' }, + subject: { kind: 'session', id: params.runtimeSessionId }, + data: { status: 'reconnecting', detail: 'polling getSession for terminal state' } } - - await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries))); - } + ]); + await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries))); } } diff --git a/src/runtime/proto-registry.service.ts b/src/runtime/proto-registry.service.ts index 5fd7141..1ab96b9 100644 --- a/src/runtime/proto-registry.service.ts +++ b/src/runtime/proto-registry.service.ts @@ -42,6 +42,9 @@ const MESSAGE_TYPE_MAP: Record> = { Approve: 'macp.modes.quorum.v1.ApprovePayload', Reject: 'macp.modes.quorum.v1.RejectPayload', Abstain: 'macp.modes.quorum.v1.AbstainPayload' + }, + 'ext.multi_round.v1': { + Contribute: '__json__' } }; @@ -73,6 +76,7 @@ export class ProtoRegistryService implements OnModuleInit { const missingTypes: string[] = []; for (const [mode, types] of Object.entries(MESSAGE_TYPE_MAP)) { for (const [msgType, typeName] of Object.entries(types)) { + if (typeName === '__json__') continue; // Extension modes use JSON, no proto type try { this.root.lookupType(typeName); } catch { @@ -120,6 +124,10 @@ export class ProtoRegistryService implements OnModuleInit { if (!typeName) { return this.tryDecodeUtf8(payload); } + // Extension modes using JSON payloads (no proto definition) + if (typeName === '__json__') { + return this.tryDecodeUtf8(payload); + } return this.decodeMessage(typeName, payload); } diff --git a/src/runtime/rust-runtime.provider.ts b/src/runtime/rust-runtime.provider.ts index dfd7f3f..fb8dd5b 100644 --- a/src/runtime/rust-runtime.provider.ts +++ b/src/runtime/rust-runtime.provider.ts @@ -103,8 +103,8 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { cancellation: { cancelSession: true }, progress: { progress: true }, manifest: { getManifest: true }, - modeRegistry: { listModes: true, listChanged: false }, - roots: { listRoots: true, listChanged: false }, + modeRegistry: { listModes: true, listChanged: true }, + roots: { listRoots: true, listChanged: true }, experimental: { features: {} } } }, undefined, opts); @@ -119,6 +119,7 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { websiteUrl: response.runtimeInfo?.websiteUrl }, supportedModes: response.supportedModes ?? [], + instructions: response.instructions || undefined, capabilities: response.capabilities ? { sessions: response.capabilities.sessions, cancellation: response.capabilities.cancellation, @@ -173,6 +174,13 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { const ack = this.fromAck(response.ack); if (!ack.ok && ack.error) { + if (ack.error.code === 'INVALID_SESSION_ID') { + throw new AppException( + ErrorCode.INVALID_SESSION_ID, + `Runtime rejected SessionStart: [${ack.error.code}] ${ack.error.message}`, + 400 + ); + } throw new AppException( ErrorCode.RUNTIME_UNAVAILABLE, `Runtime rejected SessionStart: [${ack.error.code}] ${ack.error.message}`, @@ -210,7 +218,7 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { messageType: 'SessionStart', messageId: randomUUID(), sessionId: runtimeSessionId, - sender: initiator, + sender: '', payload }); @@ -414,6 +422,13 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { const ack = this.fromAck(response.ack); if (!ack.ok && ack.error) { + if (ack.error.code === 'INVALID_SESSION_ID') { + throw new AppException( + ErrorCode.INVALID_SESSION_ID, + `Runtime rejected message: [${ack.error.code}] ${ack.error.message}`, + 400 + ); + } throw new AppException( ErrorCode.RUNTIME_UNAVAILABLE, `Runtime rejected message: [${ack.error.code}] ${ack.error.message}`, @@ -423,86 +438,14 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { return { ack, envelope }; } - async *streamSession(req: RuntimeStreamSessionRequest): AsyncIterable { - const creds = await this.credentialResolver.resolve({ - runtimeKind: this.kind, - fallbackSender: req.subscriberId || this.config.runtimeStreamSubscriberId - }); - const metadata = this.buildMetadata(creds.metadata); - const streamMethod = this.getClientMethod('StreamSession'); - const call = streamMethod.call(this.client, metadata); - - // Phase 1.6: Event-driven async queue instead of 50ms polling - const buffer: RawRuntimeEvent[] = []; - let resolve: (() => void) | null = null; - let ended = false; - let failure: Error | null = null; - - const notify = () => { - if (resolve) { - const r = resolve; - resolve = null; - r(); - } - }; - - const waitForItem = (): Promise => - new Promise((r) => { - if (buffer.length > 0 || ended) { - r(); - } else { - resolve = r; - } - }); - - call.on('data', (chunk: any) => { - buffer.push({ - kind: 'stream-envelope', - receivedAt: new Date().toISOString(), - envelope: this.fromEnvelope(chunk.envelope) - }); - notify(); - }); - call.on('error', (error: Error) => { - failure = error; - ended = true; - notify(); - }); - call.on('end', () => { - ended = true; - notify(); - }); - - const subscriptionEnvelope = this.buildEnvelope({ - mode: req.modeName, - messageType: this.config.runtimeStreamSubscriptionMessageType, - messageId: randomUUID(), - sessionId: req.runtimeSessionId, - sender: creds.sender, - payload: Buffer.from(JSON.stringify({ sessionId: req.runtimeSessionId }), 'utf8') - }); - call.write({ envelope: this.toGrpcEnvelope(subscriptionEnvelope) }); - call.end(); - - // Yield the initial stream-opened event - yield { - kind: 'stream-status', - receivedAt: new Date().toISOString(), - streamStatus: { status: 'opened' } - }; - - while (true) { - await waitForItem(); - while (buffer.length > 0) { - yield buffer.shift()!; - } - if (ended && buffer.length === 0) break; - } - - if (failure) { - this.logger.warn(`streamSession ended with error for ${req.runtimeSessionId}: ${(failure as Error).message}`); - throw failure; - } + async *streamSession(_req: RuntimeStreamSessionRequest): AsyncIterable { + // SessionWatch / passive attach is no longer part of the base protocol. + // Reconnection now uses getSession() polling in StreamConsumerService. + throw new AppException( + ErrorCode.INTERNAL_ERROR, + 'streamSession() is deprecated — reconnection uses getSession() polling', + 500 + ); } async getSession(req: RuntimeGetSessionRequest): Promise {