Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/config/app-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ export class AppConfigService implements OnModuleInit {
readonly runtimeUseDevHeader = readBoolean('RUNTIME_USE_DEV_HEADER', process.env.NODE_ENV === 'development');
readonly runtimeRequestTimeoutMs = readNumber('RUNTIME_REQUEST_TIMEOUT_MS', 30000);
readonly runtimeDevAgentId = process.env.RUNTIME_DEV_AGENT_ID ?? 'control-plane';
/** @deprecated SessionWatch is no longer part of the base protocol. Kept for backward compat. */
readonly runtimeStreamSubscriptionMessageType =
process.env.RUNTIME_STREAM_SUBSCRIPTION_MESSAGE_TYPE ?? 'SessionWatch';
/** @deprecated SessionWatch is no longer part of the base protocol. Kept for backward compat. */
readonly runtimeStreamSubscriberId =
process.env.RUNTIME_STREAM_SUBSCRIBER_ID ?? this.runtimeDevAgentId;

Expand Down
1 change: 1 addition & 0 deletions src/contracts/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export interface RuntimeInitializeResult {
};
supportedModes: string[];
capabilities?: RuntimeCapabilities;
instructions?: string;
}

export interface RuntimeStartSessionRequest {
Expand Down
3 changes: 2 additions & 1 deletion src/errors/error-codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ export enum ErrorCode {
MODE_NOT_SUPPORTED = 'MODE_NOT_SUPPORTED',
CIRCUIT_BREAKER_OPEN = 'CIRCUIT_BREAKER_OPEN',
SIGNAL_DISPATCH_FAILED = 'SIGNAL_DISPATCH_FAILED',
CONTEXT_UPDATE_FAILED = 'CONTEXT_UPDATE_FAILED'
CONTEXT_UPDATE_FAILED = 'CONTEXT_UPDATE_FAILED',
INVALID_SESSION_ID = 'INVALID_SESSION_ID'
}
3 changes: 2 additions & 1 deletion src/events/event-normalizer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ export class EventNormalizerService implements EventNormalizer {
'TaskFail',
'HandoffContext',
'HandoffAccept',
'HandoffDecline'
'HandoffDecline',
'Contribute'
].includes(messageType)
) {
return 'proposal.updated';
Expand Down
4 changes: 4 additions & 0 deletions src/runs/run-executor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ export class RunExecutorService {
}
);

if (initResult.instructions) {
this.logger.log(`runtime instructions: ${initResult.instructions}`);
}

if (
initResult.supportedModes.length > 0 &&
!initResult.supportedModes.includes(request.session.modeName)
Expand Down
3 changes: 2 additions & 1 deletion src/runs/run-recovery.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ describe('RunRecoveryService', () => {
runId: 'run-1',
runtimeSessionId: 'sess-1',
subscriberId: 'agent-1',
resumeFromSeq: 42
resumeFromSeq: 42,
pollOnly: true
})
);
});
Expand Down
3 changes: 2 additions & 1 deletion src/runs/run-recovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ export class RunRecoveryService implements OnApplicationBootstrap {
runtimeKind: run.runtimeKind,
runtimeSessionId,
subscriberId,
resumeFromSeq
resumeFromSeq,
pollOnly: true
});

this.logger.log(`recovered run ${run.id} from seq ${run.lastEventSeq}`);
Expand Down
49 changes: 6 additions & 43 deletions src/runs/stream-consumer.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,8 @@ describe('StreamConsumerService', () => {

describe('start()', () => {
it('should be idempotent — second call returns immediately without starting a new loop', async () => {
// Create an async iterable that never resolves so the loop stays active
const neverEndingIterable: AsyncIterable<any> = {
[Symbol.asyncIterator]() {
return {
next: () => new Promise(() => {}), // never resolves
};
},
};

const mockProvider = {
streamSession: jest.fn().mockReturnValue(neverEndingIterable),
getSession: jest.fn(),
getSession: jest.fn().mockReturnValue(new Promise(() => {})), // never resolves, keeps loop active
};
runtimeRegistry.get.mockReturnValue(mockProvider as any);

Expand All @@ -156,24 +146,15 @@ describe('StreamConsumerService', () => {
// Second call should return immediately since 'run-1' is already active
await service.start(params);

// streamSession should have been called only once (from the first start call)
expect(mockProvider.streamSession).toHaveBeenCalledTimes(1);
// getSession should have been called only once (from the first start call's poll loop)
expect(mockProvider.getSession).toHaveBeenCalledTimes(1);
});
});

describe('stop()', () => {
it('should set the aborted flag on the active stream marker', async () => {
const neverEndingIterable: AsyncIterable<any> = {
[Symbol.asyncIterator]() {
return {
next: () => new Promise(() => {}),
};
},
};

const mockProvider = {
streamSession: jest.fn().mockReturnValue(neverEndingIterable),
getSession: jest.fn(),
getSession: jest.fn().mockReturnValue(new Promise(() => {})),
};
runtimeRegistry.get.mockReturnValue(mockProvider as any);

Expand Down Expand Up @@ -214,17 +195,8 @@ describe('StreamConsumerService', () => {

describe('onModuleDestroy()', () => {
it('should abort all active streams', async () => {
const neverEndingIterable: AsyncIterable<any> = {
[Symbol.asyncIterator]() {
return {
next: () => new Promise(() => {}),
};
},
};

const mockProvider = {
streamSession: jest.fn().mockReturnValue(neverEndingIterable),
getSession: jest.fn(),
getSession: jest.fn().mockReturnValue(new Promise(() => {})),
};
runtimeRegistry.get.mockReturnValue(mockProvider as any);

Expand Down Expand Up @@ -269,17 +241,8 @@ describe('StreamConsumerService', () => {
});

it('should return false when a stream is active but not connected', async () => {
const neverEndingIterable: AsyncIterable<any> = {
[Symbol.asyncIterator]() {
return {
next: () => new Promise(() => {}),
};
},
};

const mockProvider = {
streamSession: jest.fn().mockReturnValue(neverEndingIterable),
getSession: jest.fn(),
getSession: jest.fn().mockReturnValue(new Promise(() => {})),
};
runtimeRegistry.get.mockReturnValue(mockProvider as any);

Expand Down
116 changes: 36 additions & 80 deletions src/runs/stream-consumer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class StreamConsumerService implements OnModuleDestroy {
subscriberId: string;
resumeFromSeq?: number;
sessionHandle?: RuntimeSessionHandle;
pollOnly?: boolean;
}): Promise<void> {
if (this.active.has(params.runId)) return;
const marker: ActiveStream = {
Expand Down Expand Up @@ -116,6 +117,7 @@ export class StreamConsumerService implements OnModuleDestroy {
runtimeSessionId: string;
subscriberId: string;
sessionHandle?: RuntimeSessionHandle;
pollOnly?: boolean;
}
): Promise<void> {
const provider = this.runtimeRegistry.get(params.runtimeKind);
Expand All @@ -125,31 +127,29 @@ export class StreamConsumerService implements OnModuleDestroy {
runtimeSessionId: params.runtimeSessionId
};

let retries = 0;
const maxRetries = this.config.streamMaxRetries;
let isFirstIteration = true;

while (!marker.aborted) {
// If we have a session handle and not poll-only, consume the stream first
if (params.sessionHandle && !params.pollOnly) {
try {
// First iteration: use the session handle's events if provided
// Subsequent iterations (reconnection): fall back to streamSession()
const iterable = (isFirstIteration && params.sessionHandle)
? params.sessionHandle.events
: provider.streamSession({
runId: params.runId,
runtimeSessionId: params.runtimeSessionId,
modeName: params.execution.session.modeName,
subscriberId: params.subscriberId
});
isFirstIteration = false;

for await (const raw of this.withIdleTimeout(iterable, this.config.streamIdleTimeoutMs)) {
for await (const raw of this.withIdleTimeout(params.sessionHandle.events, this.config.streamIdleTimeoutMs)) {
if (marker.aborted) return;
await this.handleRawEvent(params.runId, raw, context, params.runtimeSessionId, marker);
if (marker.finalized) return;
retries = 0;
}
} catch (error) {
marker.connected = false;
this.logger.warn(`stream error for run ${params.runId}: ${error instanceof Error ? error.message : String(error)}`);
}

// Stream ended — check if already finalized
if (marker.finalized || marker.aborted) return;
}

// Polling fallback: poll getSession() until terminal state or max retries
let retries = 0;
while (!marker.aborted && !marker.finalized) {
try {
const snapshot = await provider.getSession({
runId: params.runId,
runtimeSessionId: params.runtimeSessionId,
Expand All @@ -172,72 +172,28 @@ export class StreamConsumerService implements OnModuleDestroy {
await this.finalizeRun(params.runId, marker, 'failed', new Error('runtime session expired'));
return;
}
} catch (pollError) {
this.logger.warn(
`getSession poll failed for run ${params.runId}: ${pollError instanceof Error ? pollError.message : String(pollError)}`
);
}

retries += 1;
if (retries > maxRetries) {
await this.finalizeRun(params.runId, marker, 'failed', new Error('stream ended without terminal session state'));
return;
}

await this.eventService.emitControlPlaneEvents(params.runId, [
{
ts: new Date().toISOString(),
type: 'session.stream.opened',
source: { kind: 'control-plane', name: 'stream-consumer' },
subject: { kind: 'session', id: params.runtimeSessionId },
data: { status: 'reconnecting', detail: 'stream ended before terminal state; retrying' }
}
]);
await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries)));
} catch (error) {
marker.connected = false;
retries += 1;
this.logger.warn(`stream error for run ${params.runId}: ${error instanceof Error ? error.message : String(error)}`);
await this.eventService.emitControlPlaneEvents(params.runId, [
{
ts: new Date().toISOString(),
type: 'session.stream.opened',
source: { kind: 'control-plane', name: 'stream-consumer' },
subject: { kind: 'session', id: params.runtimeSessionId },
data: { status: 'reconnecting', detail: error instanceof Error ? error.message : String(error) }
}
]);

if (retries > maxRetries) {
await this.finalizeRun(params.runId, marker, 'failed', error);
return;
}
retries += 1;
if (retries > maxRetries) {
await this.finalizeRun(params.runId, marker, 'failed', new Error('polling exhausted without terminal session state'));
return;
}

try {
const snapshot = await provider.getSession({
runId: params.runId,
runtimeSessionId: params.runtimeSessionId,
requesterId: params.subscriberId
});
await this.handleRawEvent(
params.runId,
{ kind: 'session-snapshot', receivedAt: new Date().toISOString(), sessionSnapshot: snapshot },
context,
params.runtimeSessionId,
marker
);
if (marker.finalized) return;
if (snapshot.state === 'SESSION_STATE_RESOLVED') {
await this.finalizeRun(params.runId, marker, 'completed');
return;
}
if (snapshot.state === 'SESSION_STATE_EXPIRED') {
await this.finalizeRun(params.runId, marker, 'failed', new Error('runtime session expired'));
return;
}
} catch (snapshotError) {
this.logger.warn(
`reconciliation failed for run ${params.runId}: ${snapshotError instanceof Error ? snapshotError.message : String(snapshotError)}`
);
await this.eventService.emitControlPlaneEvents(params.runId, [
{
ts: new Date().toISOString(),
type: 'session.stream.opened',
source: { kind: 'control-plane', name: 'stream-consumer' },
subject: { kind: 'session', id: params.runtimeSessionId },
data: { status: 'reconnecting', detail: 'polling getSession for terminal state' }
}

await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries)));
}
]);
await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries)));
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/runtime/proto-registry.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const MESSAGE_TYPE_MAP: Record<string, Record<string, string>> = {
Approve: 'macp.modes.quorum.v1.ApprovePayload',
Reject: 'macp.modes.quorum.v1.RejectPayload',
Abstain: 'macp.modes.quorum.v1.AbstainPayload'
},
'ext.multi_round.v1': {
Contribute: '__json__'
}
};

Expand Down Expand Up @@ -73,6 +76,7 @@ export class ProtoRegistryService implements OnModuleInit {
const missingTypes: string[] = [];
for (const [mode, types] of Object.entries(MESSAGE_TYPE_MAP)) {
for (const [msgType, typeName] of Object.entries(types)) {
if (typeName === '__json__') continue; // Extension modes use JSON, no proto type
try {
this.root.lookupType(typeName);
} catch {
Expand Down Expand Up @@ -120,6 +124,10 @@ export class ProtoRegistryService implements OnModuleInit {
if (!typeName) {
return this.tryDecodeUtf8(payload);
}
// Extension modes using JSON payloads (no proto definition)
if (typeName === '__json__') {
return this.tryDecodeUtf8(payload);
}
return this.decodeMessage(typeName, payload);
}

Expand Down
Loading
Loading