diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bfc49c..93b5b35 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,9 +132,48 @@ jobs: fi echo "No leaked secrets detected." + conventions: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Enforce code conventions (see CLAUDE.md) + run: | + set -o pipefail + fail=0 + + echo "::group::throw new Error outside allowed files" + if grep -rn "throw new Error" src --include='*.ts' | grep -vE '(app-config|migrate|\.spec\.ts|circuit-breaker\.ts|proto-registry\.service\.ts|grpc-helpers\.ts|run\.repository\.ts|webhook\.service\.ts|run-recovery\.service\.ts)'; then + echo "::error::throw new Error found outside allowed files. Use AppException or a Nest built-in." + fail=1 + fi + echo "::endgroup::" + + echo "::group::console.* outside migrate / specs" + if grep -rn 'console\.' src --include='*.ts' | grep -vE '(migrate|\.spec\.ts)'; then + echo "::error::console.* found outside migrate / tests. Use Nest Logger instead." + fail=1 + fi + echo "::endgroup::" + + echo "::group::process.env outside allowed files" + if grep -rn 'process\.env' src --include='*.ts' | grep -vE '(app-config|main\.ts|telemetry/telemetry\.ts|db/migrate\.ts|\.spec\.ts)'; then + echo "::error::process.env read outside app-config/main/telemetry/migrate. Centralize in AppConfigService." + fail=1 + fi + echo "::endgroup::" + + echo "::group::Observer-only: no write-path endpoints reintroduced" + if grep -rn 'POST.*runs/:id/\(messages\|signal\|context\)' src --include='*.ts' | grep -vE '(gone\(|\.spec\.ts|REMOVED|deprecated)'; then + echo "::error::Write-path endpoint references found. Control-plane is observer-only (direct-agent-auth)." + fail=1 + fi + echo "::endgroup::" + + exit $fail + build: runs-on: ubuntu-latest - needs: [lint, typecheck, test, check-env-secrets] + needs: [lint, typecheck, test, check-env-secrets, conventions] permissions: contents: read packages: read diff --git a/docs/API.md b/docs/API.md index 37e83bc..43b2d88 100644 --- a/docs/API.md +++ b/docs/API.md @@ -414,12 +414,12 @@ Metrics summary including token usage and estimated cost: } ``` -**Token usage convention:** Agents include token data in message metadata: +**Token usage convention:** Agents include token data in envelope metadata when sending via `macp-sdk-*` directly to the runtime. The control-plane observes that envelope on its read-only stream: ```json -POST /runs/:id/messages +// Envelope emitted by the agent via the SDK (e.g. session.send(...)) { - "from": "fraud-agent", "messageType": "Evaluation", + "sender": "fraud-agent", "payload": { ... }, "metadata": { "tokenUsage": { @@ -805,7 +805,7 @@ Emitted for each commitment the runtime evaluates against the active policy. Emitted in two cases: 1. The runtime sends a `PolicyDenied` stream message. -2. A send-ack (`POST /runs/:id/messages`) returns `error.code = "POLICY_DENIED"`. The control-plane synthesizes the event so deny reasons are visible on the event stream even if the runtime doesn't echo them back. +2. A runtime-emitted send-ack observed on the stream carries `error.code = "POLICY_DENIED"` (the agent's `Send` RPC was rejected by policy). The control-plane synthesizes the event so deny reasons are visible on the event stream even if the runtime doesn't echo them back as a dedicated `PolicyDenied` envelope. ```json { diff --git a/docs/INTEGRATION.md b/docs/INTEGRATION.md index 7367c54..0de8c23 100644 --- a/docs/INTEGRATION.md +++ b/docs/INTEGRATION.md @@ -8,7 +8,7 @@ Key methods to implement (observer-only surface, post direct-agent-auth): - `initialize()` — protocol version negotiation. -- `subscribeSession({runId, runtimeSessionId})` — read-only `StreamSession` observer; returns `{events, abort}`. **Never writes envelopes.** +- `subscribeSession({runId, runtimeSessionId, afterSequence?})` — read-only `StreamSession` observer; returns `{events, abort}`. **Never writes envelopes.** Per RFC-MACP-0006 §3.2 the provider writes a single passive-subscribe frame (`{subscribeSessionId, afterSequence}`) and immediately half-closes the write side; the runtime then replays accepted history from `afterSequence` (default 0 = full replay) before switching to live broadcast. - `getSession()` — poll for session state (used by the observer's `pollForOpenSession` loop). - `cancelSession()` — only called when `run.metadata.cancellationDelegated === true` (Option B in direct-agent-auth §Cancellation design). - `getManifest()` / `listModes()` / `listRoots()` / `health()` — metadata. diff --git a/docs/TROUBLESHOOTING.md b/docs/TROUBLESHOOTING.md index c49f3de..fd4fb28 100644 --- a/docs/TROUBLESHOOTING.md +++ b/docs/TROUBLESHOOTING.md @@ -43,21 +43,21 @@ 4. Manually cancel: `POST /runs/{id}/cancel` 5. If recovery is enabled (`RUN_RECOVERY_ENABLED=true`), the system auto-recovers orphaned runs on startup -## Message Send Failures +## Legacy Write Endpoints Return 410 Gone -**Symptom:** `POST /runs/:id/messages` returns 400 or 502 +**Symptom:** `POST /runs/:id/messages`, `/signal`, or `/context` returns `410 Gone` with `errorCode: ENDPOINT_REMOVED`. -**Common causes:** -- Run not in `binding_session` or `running` state → check `GET /runs/:id` -- Invalid payload encoding → use `payloadEnvelope` with `encoding: "proto"` for real runtime -- Non-existent participant → `from` must match a registered participant ID -- Session expired → check session TTL +**Explanation:** The control-plane is observer-only as of the 2026-04-15 direct-agent-auth refactor. Agents authenticate to the runtime directly and emit their own envelopes via `macp-sdk-python` / `macp-sdk-typescript`. See `docs/API.md` § "Messages & Signals — emission is NOT via the control-plane" for migration guidance. -## Signals Not Appearing in Projection +## Agent Envelopes Not Appearing in Projection -**Symptom:** `POST /runs/:id/signal` succeeds but signals don't appear in `GET /runs/:id/state` +**Symptom:** Agents call `session.send(...)` via the SDK but events don't appear in `GET /runs/:id/state`. -**Explanation:** Signals are recorded as `message.sent` events by the control plane. The `signal.emitted` projection entries only appear when the runtime echoes signals back via the gRPC stream as `stream-envelope` events with `messageType: Signal`. With a mock runtime that doesn't echo signals, only `message.sent` events (with `subject.kind: signal`) appear. +**Checks:** +1. Confirm the run's `runtimeSessionId` matches the `session_id` the agent is writing to (`GET /runs/:id`). +2. Check stream consumer logs for `StreamSession` reconnection loops — the observer subscribes read-only and must be connected. +3. Confirm the runtime echoes envelopes back on the stream (some runtimes only echo certain message types). `signal.emitted` and `message.sent` canonical events require `stream-envelope` entries on the observer stream. +4. For session discovery, verify `SESSION_DISCOVERY_ENABLED=true` so externally-launched sessions auto-create runs. ## SSE Stream Drops @@ -101,11 +101,11 @@ | `CIRCUIT_BREAKER_OPEN` | 503 | Runtime circuit breaker is open | | `STREAM_EXHAUSTED` | 500 | Max stream reconnection retries reached | | `SESSION_EXPIRED` | 410 | Runtime session has expired | -| `KICKOFF_FAILED` | 502 | Kickoff message failed after retries | | `MODE_NOT_SUPPORTED` | 400 | Runtime does not support requested mode | | `VALIDATION_ERROR` | 400 | Request body validation failed | -| `MESSAGE_SEND_FAILED` | 502 | Runtime rejected a session message | -| `SIGNAL_DISPATCH_FAILED` | 502 | Runtime rejected a signal | -| `CONTEXT_UPDATE_FAILED` | 502 | Runtime rejected context update | | `INVALID_SESSION_ID` | 400 | Session ID not recognized by runtime | +| `UNKNOWN_POLICY_VERSION` | 400 | Policy version not found in registry | +| `POLICY_DENIED` | 403 | Commitment rejected by policy rules | +| `INVALID_POLICY_DEFINITION` | 400 | Policy rules fail schema validation | +| `SESSION_ALREADY_EXISTS` | 409 | Duplicate session start attempt | | `INTERNAL_ERROR` | 500 | Unexpected server error | diff --git a/package-lock.json b/package-lock.json index 24ca8de..60fbea7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,7 +11,7 @@ "dependencies": { "@grpc/grpc-js": "^1.14.0", "@grpc/proto-loader": "^0.8.0", - "@multiagentcoordinationprotocol/proto": "^0.1.0", + "@multiagentcoordinationprotocol/proto": "^0.1.2", "@nestjs/common": "^11.1.16", "@nestjs/core": "^11.1.16", "@nestjs/platform-express": "^11.1.12", @@ -2867,9 +2867,9 @@ "license": "MIT" }, "node_modules/@multiagentcoordinationprotocol/proto": { - "version": "0.1.0", - "resolved": "https://npm.pkg.github.com/download/@multiagentcoordinationprotocol/proto/0.1.0/dc212e958128ab97cb756d377b47ae5ecb83dc27", - "integrity": "sha512-7DgX2O1eiZIbcVPEQV7EMvUpPCjQxXXtqDieUHRyV+LQHNnxpOg683MWvLTn5cEvbb7Kbxd7AuiaVPHpKR6JrA==", + "version": "0.1.2", + "resolved": "https://npm.pkg.github.com/download/@multiagentcoordinationprotocol/proto/0.1.2/66f30edea984cb59dbff9fe0f71b6ac8ce5530e2", + "integrity": "sha512-ZDb8fcGllvN3q1DpUDrVhrYLbyhiDI7iEfCCl4RdSVdRM9STqCmxciii9Lwv1pwA8M/FOVhm93bAQk7jgMKq9A==", "license": "Apache-2.0" }, "node_modules/@nestjs/cli": { diff --git a/package.json b/package.json index 01f90fa..a947482 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ "dev:link-protos": "cd ../multiagentcoordinationprotocol/packages/proto-npm && npm link && cd - && npm link @multiagentcoordinationprotocol/proto" }, "dependencies": { - "@multiagentcoordinationprotocol/proto": "^0.1.0", + "@multiagentcoordinationprotocol/proto": "^0.1.2", "@grpc/grpc-js": "^1.14.0", "@grpc/proto-loader": "^0.8.0", "@nestjs/common": "^11.1.16", diff --git a/src/contracts/runtime.ts b/src/contracts/runtime.ts index 9d7d1a7..476dc0e 100644 --- a/src/contracts/runtime.ts +++ b/src/contracts/runtime.ts @@ -76,17 +76,6 @@ export interface RuntimeInitializeResult { instructions?: string; } -/** - * Result shape returned once the observer has confirmed the session is OPEN. - * `initiator` is learned from the runtime's session metadata (GetSession response), - * never chosen by the control-plane. - */ -export interface RuntimeSessionOpenResult { - runtimeSessionId: string; - initiator: string; - ack: RuntimeAck; -} - export interface RuntimeGetSessionRequest { runId: string; runtimeSessionId: string; @@ -156,6 +145,11 @@ export interface RuntimeCallOptions { export interface RuntimeSubscribeSessionRequest { runId: string; runtimeSessionId: string; + /** + * RFC-MACP-0006 §3.2: replay log sequence to start from. 0 (default) replays + * the full accepted history before switching to live broadcast. + */ + afterSequence?: number; } /** diff --git a/src/errors/error-codes.ts b/src/errors/error-codes.ts index e263557..1518a12 100644 --- a/src/errors/error-codes.ts +++ b/src/errors/error-codes.ts @@ -5,15 +5,11 @@ export enum ErrorCode { RUNTIME_TIMEOUT = 'RUNTIME_TIMEOUT', STREAM_EXHAUSTED = 'STREAM_EXHAUSTED', SESSION_EXPIRED = 'SESSION_EXPIRED', - KICKOFF_FAILED = 'KICKOFF_FAILED', VALIDATION_ERROR = 'VALIDATION_ERROR', INTERNAL_ERROR = 'INTERNAL_ERROR', MODE_NOT_SUPPORTED = 'MODE_NOT_SUPPORTED', CIRCUIT_BREAKER_OPEN = 'CIRCUIT_BREAKER_OPEN', - SIGNAL_DISPATCH_FAILED = 'SIGNAL_DISPATCH_FAILED', - CONTEXT_UPDATE_FAILED = 'CONTEXT_UPDATE_FAILED', INVALID_SESSION_ID = 'INVALID_SESSION_ID', - MESSAGE_SEND_FAILED = 'MESSAGE_SEND_FAILED', UNKNOWN_POLICY_VERSION = 'UNKNOWN_POLICY_VERSION', POLICY_DENIED = 'POLICY_DENIED', INVALID_POLICY_DEFINITION = 'INVALID_POLICY_DEFINITION', diff --git a/src/metrics/metrics.service.ts b/src/metrics/metrics.service.ts index adaa6a6..2cfcdc0 100644 --- a/src/metrics/metrics.service.ts +++ b/src/metrics/metrics.service.ts @@ -15,7 +15,7 @@ function safeNumber(val: unknown, fallback = 0): number { * { "tokenUsage": { "promptTokens": N, "completionTokens": N, "model": "..." } } * * This can appear in: - * - event.data.metadata.tokenUsage (sent via POST /runs/:id/messages metadata) + * - event.data.metadata.tokenUsage (envelope metadata emitted by agents via the macp-sdk) * - event.data.decodedPayload.tokenUsage (embedded in proto payload) * - event.data.payloadDescriptor.tokenUsage (from payload descriptor) * - event.data.tokenUsage (direct) diff --git a/src/runs/session-discovery.service.spec.ts b/src/runs/session-discovery.service.spec.ts new file mode 100644 index 0000000..5d8d85b --- /dev/null +++ b/src/runs/session-discovery.service.spec.ts @@ -0,0 +1,237 @@ +import { SessionDiscoveryService } from './session-discovery.service'; +import { AppConfigService } from '../config/app-config.service'; +import { RuntimeProviderRegistry } from '../runtime/runtime-provider.registry'; +import { RunManagerService } from './run-manager.service'; +import { StreamConsumerService } from './stream-consumer.service'; +import { InstrumentationService } from '../telemetry/instrumentation.service'; +import { SessionLifecycleEvent } from '../contracts/runtime'; + +function makeLifecycleEvent( + type: 'created' | 'resolved' | 'expired', + sessionId: string, + overrides: Partial = {} +): SessionLifecycleEvent { + return { + eventType: type, + observedAtUnixMs: Date.now(), + session: { + sessionId, + mode: 'decision', + state: 'SESSION_STATE_OPEN', + initiator: 'agent-1', + modeVersion: '1.0.0', + configurationVersion: 'cfg.default', + policyVersion: 'policy.default', + startedAtUnixMs: 1_000, + expiresAtUnixMs: 301_000, + ...overrides + } + }; +} + +async function* scriptedStream(events: SessionLifecycleEvent[]) { + for (const ev of events) yield ev; +} + +describe('SessionDiscoveryService', () => { + let service: SessionDiscoveryService; + let mockConfig: Partial; + let mockRegistry: { get: jest.Mock }; + let mockRunManager: { + findBySessionId: jest.Mock; + createRun: jest.Mock; + markStarted: jest.Mock; + bindSession: jest.Mock; + markRunning: jest.Mock; + markCompleted: jest.Mock; + markFailed: jest.Mock; + }; + let mockStreamConsumer: { start: jest.Mock }; + let mockProvider: { watchSessions: jest.Mock; subscribeSession: jest.Mock }; + let mockInstrumentation: Partial; + + beforeEach(() => { + mockConfig = { sessionDiscoveryEnabled: true }; + mockProvider = { + watchSessions: jest.fn(), + subscribeSession: jest.fn().mockReturnValue({ events: (async function* () {})(), abort: jest.fn() }) + }; + mockRegistry = { get: jest.fn().mockReturnValue(mockProvider) }; + mockRunManager = { + findBySessionId: jest.fn().mockResolvedValue(null), + createRun: jest.fn(async (_desc, _sid, runId) => ({ id: runId ?? 'run-x', status: 'queued' })), + markStarted: jest.fn().mockResolvedValue({}), + bindSession: jest.fn().mockResolvedValue({}), + markRunning: jest.fn().mockResolvedValue({}), + markCompleted: jest.fn().mockResolvedValue({}), + markFailed: jest.fn().mockResolvedValue({}) + }; + mockStreamConsumer = { start: jest.fn().mockResolvedValue(undefined) }; + mockInstrumentation = {}; + + service = new SessionDiscoveryService( + mockRegistry as unknown as RuntimeProviderRegistry, + mockRunManager as unknown as RunManagerService, + mockStreamConsumer as unknown as StreamConsumerService, + mockInstrumentation as InstrumentationService, + mockConfig as AppConfigService + ); + }); + + afterEach(() => { + service.onModuleDestroy(); + }); + + it('skips discovery loop when SESSION_DISCOVERY_ENABLED=false', async () => { + const disabled = new SessionDiscoveryService( + mockRegistry as unknown as RuntimeProviderRegistry, + mockRunManager as unknown as RunManagerService, + mockStreamConsumer as unknown as StreamConsumerService, + mockInstrumentation as InstrumentationService, + { sessionDiscoveryEnabled: false } as AppConfigService + ); + await disabled.onModuleInit(); + expect(mockRegistry.get).not.toHaveBeenCalled(); + expect(mockProvider.watchSessions).not.toHaveBeenCalled(); + }); + + it('auto-creates a run for a newly created session and starts stream consumer', async () => { + mockProvider.watchSessions.mockReturnValue( + scriptedStream([makeLifecycleEvent('created', 'session-abc')]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.createRun).toHaveBeenCalledWith( + expect.objectContaining({ + mode: 'live', + runtime: { kind: 'rust' }, + session: expect.objectContaining({ + sessionId: 'session-abc', + modeName: 'decision', + metadata: expect.objectContaining({ source: 'session-discovery', initiator: 'agent-1' }) + }) + }), + 'session-abc', + 'session-abc' + ); + expect(mockRunManager.markStarted).toHaveBeenCalled(); + expect(mockRunManager.bindSession).toHaveBeenCalled(); + expect(mockRunManager.markRunning).toHaveBeenCalledWith(expect.any(String), 'session-abc'); + expect(mockProvider.subscribeSession).toHaveBeenCalledWith( + expect.objectContaining({ runtimeSessionId: 'session-abc' }) + ); + expect(mockStreamConsumer.start).toHaveBeenCalledWith( + expect.objectContaining({ + runtimeSessionId: 'session-abc', + subscriberId: expect.stringMatching(/^discovery-/) + }) + ); + }); + + it('skips duplicate created events for the same session', async () => { + mockProvider.watchSessions.mockReturnValue( + scriptedStream([ + makeLifecycleEvent('created', 'session-dup'), + makeLifecycleEvent('created', 'session-dup') + ]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.createRun).toHaveBeenCalledTimes(1); + }); + + it('skips created event when a run for that session already exists', async () => { + mockRunManager.findBySessionId.mockResolvedValue({ id: 'preexisting-run', status: 'running' }); + mockProvider.watchSessions.mockReturnValue( + scriptedStream([makeLifecycleEvent('created', 'session-existing')]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.createRun).not.toHaveBeenCalled(); + expect(mockStreamConsumer.start).not.toHaveBeenCalled(); + }); + + it('marks the run completed on a resolved event', async () => { + mockRunManager.findBySessionId.mockResolvedValue({ id: 'run-done', status: 'running' }); + mockProvider.watchSessions.mockReturnValue( + scriptedStream([makeLifecycleEvent('resolved', 'session-r')]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.markCompleted).toHaveBeenCalledWith('run-done'); + expect(mockRunManager.markFailed).not.toHaveBeenCalled(); + }); + + it('marks the run failed on an expired event', async () => { + mockRunManager.findBySessionId.mockResolvedValue({ id: 'run-expire', status: 'running' }); + mockProvider.watchSessions.mockReturnValue( + scriptedStream([makeLifecycleEvent('expired', 'session-e')]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.markFailed).toHaveBeenCalledWith('run-expire', expect.any(Error)); + expect(mockRunManager.markCompleted).not.toHaveBeenCalled(); + }); + + it('ignores terminal lifecycle events when the run is already in a terminal state', async () => { + mockRunManager.findBySessionId.mockResolvedValue({ id: 'run-term', status: 'completed' }); + mockProvider.watchSessions.mockReturnValue( + scriptedStream([ + makeLifecycleEvent('resolved', 'session-t'), + makeLifecycleEvent('expired', 'session-t') + ]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.markCompleted).not.toHaveBeenCalled(); + expect(mockRunManager.markFailed).not.toHaveBeenCalled(); + }); + + it('ignores terminal events for unknown sessions', async () => { + mockRunManager.findBySessionId.mockResolvedValue(null); + mockProvider.watchSessions.mockReturnValue( + scriptedStream([makeLifecycleEvent('resolved', 'session-unknown')]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.markCompleted).not.toHaveBeenCalled(); + expect(mockRunManager.markFailed).not.toHaveBeenCalled(); + }); + + it('ignores events missing a sessionId', async () => { + mockProvider.watchSessions.mockReturnValue( + scriptedStream([ + { + eventType: 'created', + observedAtUnixMs: Date.now(), + session: { sessionId: '', mode: 'decision', state: 'SESSION_STATE_OPEN' } + } as SessionLifecycleEvent + ]) + ); + + await service.onModuleInit(); + await flushAsync(); + + expect(mockRunManager.createRun).not.toHaveBeenCalled(); + }); +}); + +async function flushAsync(): Promise { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + } +} diff --git a/src/runtime/rust-runtime.provider.spec.ts b/src/runtime/rust-runtime.provider.spec.ts new file mode 100644 index 0000000..0e0e23a --- /dev/null +++ b/src/runtime/rust-runtime.provider.spec.ts @@ -0,0 +1,226 @@ +import { RustRuntimeProvider } from './rust-runtime.provider'; +import { AppConfigService } from '../config/app-config.service'; +import { RuntimeCredentialResolverService } from './runtime-credential-resolver.service'; +import { InstrumentationService } from '../telemetry/instrumentation.service'; +import { RawRuntimeEvent, RuntimeSubscribeSessionRequest } from '../contracts/runtime'; + +/** + * Focused unit tests for the RFC-MACP-0006 §3.2 passive-subscribe behavior + * added to RustRuntimeProvider.subscribeSession(): the control-plane writes + * exactly one frame ({subscribeSessionId, afterSequence}) on the bidi stream + * and then half-closes the write side. Full gRPC plumbing (proto loading, + * real channel) is bypassed by stubbing the gRPC client directly. + */ + +interface FakeStream { + on: jest.Mock; + write: jest.Mock; + end: jest.Mock; + cancel: jest.Mock; + emit: (event: 'data' | 'error' | 'end', payload?: unknown) => void; +} + +function makeFakeStream(): FakeStream { + const handlers: Record void>> = {}; + const stream: FakeStream = { + on: jest.fn((event: string, cb: (p: unknown) => void) => { + (handlers[event] ||= []).push(cb); + return stream; + }), + write: jest.fn(), + end: jest.fn(), + cancel: jest.fn(), + emit: (event: 'data' | 'error' | 'end', payload?: unknown) => { + for (const h of handlers[event] ?? []) h(payload); + } + }; + return stream; +} + +function makeProvider(streamFactory: () => unknown): { + provider: RustRuntimeProvider; + resolver: RuntimeCredentialResolverService; +} { + const config = { + runtimeDevAgentId: 'control-plane', + runtimeBearerToken: 'obs-token', + runtimeUseDevHeader: false, + runtimeCircuitBreakerThreshold: 5, + runtimeCircuitBreakerResetMs: 30_000 + } as unknown as AppConfigService; + + const resolver = new RuntimeCredentialResolverService(config); + const instrumentation = {} as InstrumentationService; + const provider = new RustRuntimeProvider(config, resolver, instrumentation); + + // Bypass onModuleInit() — proto loading is unnecessary for these tests. + // Stub the gRPC client so getClientMethod(client, 'StreamSession') returns + // a function that yields our fake bidi stream. + const fakeStreamSession = jest.fn(() => streamFactory()); + (provider as unknown as { client: unknown }).client = { + StreamSession: fakeStreamSession + }; + + return { provider, resolver }; +} + +async function drain(events: AsyncIterable, max = 10): Promise { + const collected: RawRuntimeEvent[] = []; + let i = 0; + for await (const ev of events) { + collected.push(ev); + if (++i >= max) break; + } + return collected; +} + +describe('RustRuntimeProvider.subscribeSession — passive-subscribe frame (RFC-MACP-0006 §3.2)', () => { + const baseReq: RuntimeSubscribeSessionRequest = { + runId: 'run-1', + runtimeSessionId: 'sess-abc' + }; + + it('writes a single passive-subscribe frame with afterSequence=0 (default) then half-closes', async () => { + const stream = makeFakeStream(); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession(baseReq); + + // Allow the launch microtask (credentials resolve + write/end) to settle. + await new Promise((r) => setImmediate(r)); + + expect(stream.write).toHaveBeenCalledTimes(1); + expect(stream.write).toHaveBeenCalledWith({ + subscribeSessionId: 'sess-abc', + afterSequence: 0 + }); + expect(stream.end).toHaveBeenCalledTimes(1); + + // Verify call order: write must precede end so the runtime sees the + // subscription metadata before the half-close. + const writeOrder = stream.write.mock.invocationCallOrder[0]; + const endOrder = stream.end.mock.invocationCallOrder[0]; + expect(writeOrder).toBeLessThan(endOrder); + + handle.abort(); + }); + + it('forwards the caller-supplied afterSequence for replay resume', async () => { + const stream = makeFakeStream(); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession({ ...baseReq, afterSequence: 42 }); + await new Promise((r) => setImmediate(r)); + + expect(stream.write).toHaveBeenCalledWith({ + subscribeSessionId: 'sess-abc', + afterSequence: 42 + }); + handle.abort(); + }); + + it('never emits an envelope frame (Send is forbidden — observer-only)', async () => { + const stream = makeFakeStream(); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession(baseReq); + await new Promise((r) => setImmediate(r)); + + for (const call of stream.write.mock.calls) { + const arg = call[0] as Record; + expect(arg).not.toHaveProperty('envelope'); + expect(arg).not.toHaveProperty('messageType'); + expect(arg).not.toHaveProperty('payload'); + } + handle.abort(); + }); + + it('surfaces an iterator failure when the subscribe-frame write throws synchronously', async () => { + const stream = makeFakeStream(); + stream.write.mockImplementation(() => { + throw new Error('write failed: channel closed'); + }); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession(baseReq); + const it = handle.events[Symbol.asyncIterator](); + + // First yielded event is always the synthetic 'opened' status frame. + const opened = await it.next(); + expect(opened.done).toBe(false); + expect((opened.value as RawRuntimeEvent).kind).toBe('stream-status'); + + await expect(it.next()).rejects.toThrow(/write failed/); + expect(stream.end).not.toHaveBeenCalled(); + }); + + it('tolerates end() throwing after a successful subscribe-frame write', async () => { + const stream = makeFakeStream(); + stream.end.mockImplementation(() => { + throw new Error('end failed: half-closed'); + }); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession(baseReq); + await new Promise((r) => setImmediate(r)); + + // The provider swallows end() failures (some gRPC impls no-op on + // half-closed streams). The write must still have happened. + expect(stream.write).toHaveBeenCalledTimes(1); + handle.abort(); + }); + + it('emits a synthetic stream-status "opened" event before any data frames', async () => { + const stream = makeFakeStream(); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession(baseReq); + const it = handle.events[Symbol.asyncIterator](); + + const first = await it.next(); + expect(first.done).toBe(false); + expect(first.value).toMatchObject({ + kind: 'stream-status', + streamStatus: { status: 'opened' } + }); + + handle.abort(); + }); + + it('filters incoming envelopes whose sessionId differs from the subscriber', async () => { + const stream = makeFakeStream(); + const { provider } = makeProvider(() => stream); + + const handle = provider.subscribeSession(baseReq); + await new Promise((r) => setImmediate(r)); + + // Other-session envelope (must be dropped). + stream.emit('data', { + envelope: { + sessionId: 'other-session', + messageType: 'Decision', + messageId: 'm1', + sender: 'agent-1', + payload: Buffer.from(''), + timestampUnixMs: 1 + } + }); + // Same-session envelope (must be delivered). + stream.emit('data', { + envelope: { + sessionId: 'sess-abc', + messageType: 'Decision', + messageId: 'm2', + sender: 'agent-1', + payload: Buffer.from(''), + timestampUnixMs: 2 + } + }); + stream.emit('end'); + + const events = await drain(handle.events); + const envelopeEvents = events.filter((e) => e.kind === 'stream-envelope'); + expect(envelopeEvents).toHaveLength(1); + expect(envelopeEvents[0].envelope?.messageId).toBe('m2'); + }); +}); diff --git a/src/runtime/rust-runtime.provider.ts b/src/runtime/rust-runtime.provider.ts index 963c3da..37e0b34 100644 --- a/src/runtime/rust-runtime.provider.ts +++ b/src/runtime/rust-runtime.provider.ts @@ -44,8 +44,11 @@ export interface GrpcCallOptions { * - Never calls `Send`. Agents emit their own envelopes directly against the runtime. * - Never allocates a sessionId. The control-plane allocates at POST /runs; the initiator * agent calls SessionStart with its own Bearer token. - * - `subscribeSession()` attaches a read-only bidi `StreamSession` — the control-plane - * only reads; it does not write the first frame (no SessionStart, no SessionWatch). + * - `subscribeSession()` attaches a read-only bidi `StreamSession`. Per RFC-MACP-0006 §3.2 + * the control-plane writes exactly one passive-subscribe frame + * (`{subscribeSessionId, afterSequence}`, no envelope) to bind the stream to the + * session's broadcast channel and request history replay, then closes the write side. + * It never writes an envelope (no SessionStart, no SessionWatch). * * The previously-shipped `openSession()` / `startSession()` / `send()` / `chooseInitiator()` * paths were deleted in CP-3 because they violated §2, §3, and §5 of the plan's invariants. @@ -246,12 +249,25 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { notify(); }); - // Observer stream: end the write side immediately — we only read. - // This tells the runtime the client is a passive subscriber. + // RFC-MACP-0006 §3.2: write a passive-subscribe frame so the runtime binds + // this stream to the session's broadcast channel and replays accepted + // history from `afterSequence` onwards. Then end the write side — we + // only read from here on. + try { + grpcCall.write({ + subscribeSessionId: req.runtimeSessionId, + afterSequence: req.afterSequence ?? 0 + }); + } catch (error) { + streamFailure = error instanceof Error ? error : new Error(String(error)); + ended = true; + notify(); + return; + } try { grpcCall.end(); } catch { - /* some gRPC impls no-op on empty streams */ + /* some gRPC impls no-op on half-closed streams */ } } catch (error) { streamFailure = error instanceof Error ? error : new Error(String(error)); diff --git a/test/integration/removed-endpoints.integration.spec.ts b/test/integration/removed-endpoints.integration.spec.ts new file mode 100644 index 0000000..3266d6f --- /dev/null +++ b/test/integration/removed-endpoints.integration.spec.ts @@ -0,0 +1,65 @@ +import { createTestApp, TestAppContext } from '../helpers/test-app'; +import { decisionHappyScript, decisionModeRequest } from '../fixtures/decision-mode'; + +/** + * End-to-end verification that the envelope-emission endpoints removed in the + * 2026-04-15 direct-agent-auth refactor continue to return 410 Gone with a + * stable error-code contract. Complements the controller unit test by exercising + * the full exception-filter → HTTP wire path. + */ +describe('Removed endpoints (integration)', () => { + let ctx: TestAppContext; + + beforeAll(async () => { + ctx = await createTestApp(decisionHappyScript()); + }); + + afterAll(async () => { + await ctx.app.close(); + }); + + beforeEach(async () => { + await ctx.cleanup(); + }); + + const cases: Array<{ path: (runId: string) => string; body: Record }> = [ + { + path: (runId) => `/runs/${runId}/messages`, + body: { from: 'agent-1', messageType: 'Evaluation', payload: {} } + }, + { + path: (runId) => `/runs/${runId}/signal`, + body: { signalType: 'progress.reported', payload: { percent: 10 } } + }, + { + path: (runId) => `/runs/${runId}/context`, + body: { contextUpdate: {} } + } + ]; + + for (const c of cases) { + it(`POST ${c.path(':id')} returns 410 with errorCode ENDPOINT_REMOVED`, async () => { + const { runId } = await ctx.client.createRun(decisionModeRequest()); + + const res = await ctx.client.requestNoAuth('POST', c.path(runId), { + headers: { Authorization: 'Bearer test-key-integration' }, + body: c.body + }); + + expect(res.status).toBe(410); + const body = res.body as Record; + expect(body.statusCode).toBe(410); + expect(body.errorCode).toBe('ENDPOINT_REMOVED'); + expect(typeof body.message).toBe('string'); + expect(body.message).toMatch(/macp-sdk/); + }); + } + + it('rejects invalid UUID on removed endpoints before the 410 (ParseUUIDPipe runs first)', async () => { + const res = await ctx.client.requestNoAuth('POST', '/runs/not-a-uuid/messages', { + headers: { Authorization: 'Bearer test-key-integration' }, + body: { from: 'agent-1', messageType: 'Evaluation', payload: {} } + }); + expect(res.status).toBe(400); + }); +});