Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ SESSION_POLL_TIMEOUT_MS=60000
# HTTP timeout for proxying a UI cancel to the initiator agent's cancelCallback.
CANCEL_CALLBACK_TIMEOUT_MS=5000

# ── Session Discovery ─────────────────────────────
# When enabled, subscribes to WatchSessions on the runtime and auto-creates
# run records for sessions started by external launchers (not via POST /runs).
SESSION_DISCOVERY_ENABLED=true

# ── Circuit Breaker ──────────────────────────────
RUNTIME_CIRCUIT_BREAKER_THRESHOLD=5
RUNTIME_CIRCUIT_BREAKER_RESET_MS=30000
Expand Down
11 changes: 11 additions & 0 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ Runtime gRPC stream
→ StreamHubService.publishEvent (SSE → live UI subscribers)
```

## Session Discovery (WatchSessions)

When `SESSION_DISCOVERY_ENABLED=true` (default), the `SessionDiscoveryService` subscribes
to the runtime's `WatchSessions` gRPC stream and auto-creates run records for sessions
started by external launchers (not via `POST /runs`). For each `created` event, it creates
a run, binds the session, subscribes the observer stream, and begins projecting events.
Terminal events (`resolved`, `expired`) finalize the auto-discovered run.

This enables the control-plane to observe and project any session the runtime hosts, even
if the launching service doesn't use the control-plane's `POST /runs` endpoint.

## Message / Signal / Context — removed (direct-agent-auth CP-5/6/7)

The `POST /runs/:id/{messages,signal,context}` endpoints were removed 2026-04-15 and now
Expand Down
32 changes: 23 additions & 9 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import { RunExecutorService } from './runs/run-executor.service';
import { RunManagerService } from './runs/run-manager.service';
import { RunRecoveryService } from './runs/run-recovery.service';
import { StreamConsumerService } from './runs/stream-consumer.service';
import { SessionDiscoveryService } from './runs/session-discovery.service';
import { WebhookController } from './controllers/webhook.controller';
import { WebhookDeliveryRepository } from './webhooks/webhook-delivery.repository';
import { WebhookRepository } from './webhooks/webhook.repository';
Expand All @@ -63,13 +64,27 @@ import { WebhookService } from './webhooks/webhook.service';
ThrottlerModule.forRootAsync({
imports: [ConfigModule],
inject: [AppConfigService],
useFactory: (config: AppConfigService) => [{
ttl: config.throttleTtlMs,
limit: config.throttleLimit,
}],
useFactory: (config: AppConfigService) => [
{
ttl: config.throttleTtlMs,
limit: config.throttleLimit
}
]
})
],
controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController, AdminController, AuditController, WebhookController, DashboardController, EventsController],
controllers: [
RunsController,
RunInsightsController,
RuntimeController,
ObservabilityController,
HealthController,
MetricsController,
AdminController,
AuditController,
WebhookController,
DashboardController,
EventsController
],
providers: [
{ provide: APP_GUARD, useClass: AuthGuard },
{ provide: APP_GUARD, useClass: ThrottleByUserGuard },
Expand All @@ -95,7 +110,7 @@ import { WebhookService } from './webhooks/webhook.service';
}
return new MemoryStreamHubStrategy();
},
inject: [AppConfigService],
inject: [AppConfigService]
},
StreamHubService,
EventNormalizerService,
Expand All @@ -107,6 +122,7 @@ import { WebhookService } from './webhooks/webhook.service';
ReplayService,
RunManagerService,
StreamConsumerService,
SessionDiscoveryService,
RunExecutorService,
RunRecoveryService,
RunInsightsService,
Expand All @@ -119,8 +135,6 @@ import { WebhookService } from './webhooks/webhook.service';
})
export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer): void {
consumer
.apply(CorrelationIdMiddleware, RequestLoggerMiddleware)
.forRoutes('*');
consumer.apply(CorrelationIdMiddleware, RequestLoggerMiddleware).forRoutes('*');
}
}
36 changes: 26 additions & 10 deletions src/artifacts/artifact.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ describe('ArtifactService', () => {
beforeEach(() => {
mockRepo = {
create: jest.fn(),
listByRunId: jest.fn(),
listByRunId: jest.fn()
};
service = new ArtifactService(mockRepo as unknown as ArtifactRepository);
});
Expand All @@ -19,12 +19,12 @@ describe('ArtifactService', () => {
runId: 'run-1',
kind: 'json' as const,
label: 'test-artifact',
uri: 'https://example.com/artifact.json',
uri: 'https://example.com/artifact.json'
};
const expected = {
...input,
id: 'a1a1a1a1-b2b2-c3c3-d4d4-e5e5e5e5e5e5',
createdAt: '2026-04-07T00:00:00.000Z',
createdAt: '2026-04-07T00:00:00.000Z'
};
mockRepo.create.mockResolvedValue(expected);

Expand All @@ -40,12 +40,12 @@ describe('ArtifactService', () => {
runId: 'run-2',
kind: 'report' as const,
label: 'inline-report',
inline: { summary: 'all good', score: 42 },
inline: { summary: 'all good', score: 42 }
};
const expected = {
...input,
id: 'b1b1b1b1-c2c2-d3d3-e4e4-f5f5f5f5f5f5',
createdAt: '2026-04-07T00:00:01.000Z',
createdAt: '2026-04-07T00:00:01.000Z'
};
mockRepo.create.mockResolvedValue(expected);

Expand All @@ -58,17 +58,33 @@ describe('ArtifactService', () => {
it('propagates repository errors', async () => {
mockRepo.create.mockRejectedValue(new Error('db write failed'));

await expect(
service.register({ runId: 'run-1', kind: 'log' as const, label: 'x' }),
).rejects.toThrow('db write failed');
await expect(service.register({ runId: 'run-1', kind: 'log' as const, label: 'x' })).rejects.toThrow(
'db write failed'
);
});
});

describe('list', () => {
it('delegates to repository.listByRunId and returns the result', async () => {
const artifacts = [
{ id: 'a1', runId: 'run-1', kind: 'json', label: 'first', uri: null, inline: null, createdAt: '2026-01-01T00:00:00Z' },
{ id: 'a2', runId: 'run-1', kind: 'trace', label: 'second', uri: null, inline: null, createdAt: '2026-01-01T00:01:00Z' },
{
id: 'a1',
runId: 'run-1',
kind: 'json',
label: 'first',
uri: null,
inline: null,
createdAt: '2026-01-01T00:00:00Z'
},
{
id: 'a2',
runId: 'run-1',
kind: 'trace',
label: 'second',
uri: null,
inline: null,
createdAt: '2026-01-01T00:01:00Z'
}
];
mockRepo.listByRunId.mockResolvedValue(artifacts);

Expand Down
32 changes: 10 additions & 22 deletions src/audit/audit.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe('AuditService', () => {
resource: 'run',
resourceId: 'run-123',
details: { mode: 'decision' },
requestId: 'req-abc',
requestId: 'req-abc'
};

beforeEach(() => {
Expand All @@ -37,8 +37,8 @@ describe('AuditService', () => {
mockDatabase = {
db: {
insert: mockInsert,
select: mockSelect,
},
select: mockSelect
}
};
service = new AuditService(mockDatabase as unknown as DatabaseService);
});
Expand Down Expand Up @@ -67,7 +67,7 @@ describe('AuditService', () => {
actor: 'system',
actorType: 'system',
action: 'circuit_breaker.reset',
resource: 'circuit_breaker',
resource: 'circuit_breaker'
};

await service.record(entry);
Expand All @@ -92,19 +92,15 @@ describe('AuditService', () => {

describe('list', () => {
it('returns data and total from the database', async () => {
const fakeRows = [
{ id: 'a1', actor: 'user-1', action: 'run.create', createdAt: '2026-01-01T00:00:00Z' },
];
const fakeRows = [{ id: 'a1', actor: 'user-1', action: 'run.create', createdAt: '2026-01-01T00:00:00Z' }];
// First select call returns data rows
mockOffset.mockResolvedValueOnce(fakeRows);
// Second select call (count) — needs its own chain
const countWhere = jest.fn().mockResolvedValue([{ count: 42 }]);
const countFrom = jest.fn().mockReturnValue({ where: countWhere });

// Promise.all calls select twice: once for data, once for count
mockSelect
.mockReturnValueOnce({ from: mockFrom })
.mockReturnValueOnce({ from: countFrom });
mockSelect.mockReturnValueOnce({ from: mockFrom }).mockReturnValueOnce({ from: countFrom });

const result = await service.list({});

Expand All @@ -117,9 +113,7 @@ describe('AuditService', () => {
const countWhere = jest.fn().mockResolvedValue([]);
const countFrom = jest.fn().mockReturnValue({ where: countWhere });

mockSelect
.mockReturnValueOnce({ from: mockFrom })
.mockReturnValueOnce({ from: countFrom });
mockSelect.mockReturnValueOnce({ from: mockFrom }).mockReturnValueOnce({ from: countFrom });

const result = await service.list({});

Expand All @@ -132,9 +126,7 @@ describe('AuditService', () => {
const countWhere = jest.fn().mockResolvedValue([{ count: 0 }]);
const countFrom = jest.fn().mockReturnValue({ where: countWhere });

mockSelect
.mockReturnValueOnce({ from: mockFrom })
.mockReturnValueOnce({ from: countFrom });
mockSelect.mockReturnValueOnce({ from: mockFrom }).mockReturnValueOnce({ from: countFrom });

await service.list({});

Expand All @@ -147,9 +139,7 @@ describe('AuditService', () => {
const countWhere = jest.fn().mockResolvedValue([{ count: 0 }]);
const countFrom = jest.fn().mockReturnValue({ where: countWhere });

mockSelect
.mockReturnValueOnce({ from: mockFrom })
.mockReturnValueOnce({ from: countFrom });
mockSelect.mockReturnValueOnce({ from: mockFrom }).mockReturnValueOnce({ from: countFrom });

await service.list({ limit: 10, offset: 20 });

Expand All @@ -162,9 +152,7 @@ describe('AuditService', () => {
const countWhere = jest.fn().mockResolvedValue([{ count: 0 }]);
const countFrom = jest.fn().mockReturnValue({ where: countWhere });

mockSelect
.mockReturnValueOnce({ from: mockFrom })
.mockReturnValueOnce({ from: countFrom });
mockSelect.mockReturnValueOnce({ from: mockFrom }).mockReturnValueOnce({ from: countFrom });

await service.list({ actor: 'user-1', action: 'run.create' });

Expand Down
17 changes: 7 additions & 10 deletions src/auth/auth.guard.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,28 @@ describe('AuthGuard', () => {
switchToHttp: () => ({
getRequest: () => request,
getResponse: jest.fn(),
getNext: jest.fn(),
getNext: jest.fn()
}),
getArgs: jest.fn(),
getArgByIndex: jest.fn(),
switchToRpc: jest.fn(),
switchToWs: jest.fn(),
getType: jest.fn(),
getType: jest.fn()
} as unknown as ExecutionContext;
}

beforeEach(() => {
mockReflector = {
getAllAndOverride: jest.fn().mockReturnValue(false),
getAllAndOverride: jest.fn().mockReturnValue(false)
};
mockConfig = {
authApiKeys: ['test-api-key-12345678'],
authApiKeys: ['test-api-key-12345678']
};
mockRequest = {
headers: {},
headers: {}
};

guard = new AuthGuard(
mockReflector as unknown as Reflector,
mockConfig as AppConfigService,
);
guard = new AuthGuard(mockReflector as unknown as Reflector, mockConfig as AppConfigService);
});

// =========================================================================
Expand All @@ -56,7 +53,7 @@ describe('AuthGuard', () => {
expect(result).toBe(true);
expect(mockReflector.getAllAndOverride).toHaveBeenCalledWith(IS_PUBLIC_KEY, [
context.getHandler(),
context.getClass(),
context.getClass()
]);
});

Expand Down
4 changes: 1 addition & 3 deletions src/auth/auth.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ export class AuthGuard implements CanActivate {
}

// Support both "Bearer <token>" and raw API key
const token = authHeader.startsWith('Bearer ')
? authHeader.slice(7)
: authHeader;
const token = authHeader.startsWith('Bearer ') ? authHeader.slice(7) : authHeader;

if (!token) {
throw new UnauthorizedException('Empty authorization token');
Expand Down
4 changes: 2 additions & 2 deletions src/auth/throttle-by-user.guard.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ describe('ThrottleByUserGuard', () => {
const context = {
switchToHttp: () => ({
getRequest: () => mockReq,
getResponse: () => mockRes,
}),
getResponse: () => mockRes
})
} as unknown as ExecutionContext;

const result = (guard as any).getRequestResponse(context);
Expand Down
Loading
Loading