From f9bdafa118698a89cd095c6702fcec7fa2a4406c Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Fri, 20 Mar 2026 19:16:34 -0700 Subject: [PATCH] v0.4.0: Fix correctness bugs, add lossless reconnect, outbound tracking, and operational resilience MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix premature run finalization where Commitment messages synthesized SESSION_STATE_RESOLVED without runtime confirmation, and decision.finalized events auto-completed runs. Runs now only finalize via runtime authority (session-snapshot or GetSession reconciliation). Phase 0 — Critical fixes: - Read clientVersion from package.json instead of hardcoded '0.2.0' - Remove synthetic session.state.changed from Commitment normalization - Remove decision.finalized auto-finalize in stream consumer - Add finalizingPromise to prevent race conditions in concurrent finalization - findByIdOrThrow now throws NotFoundException instead of plain Error - Replace console.error with NestJS Logger in bootstrap Phase 1 — Runtime integration: - Implement POST /runs/:id/context endpoint - Implement POST /runs/:id/projection/rebuild endpoint - Persist runtime capabilities from Initialize response - Add schema_version column to canonical events - Fix webhook active column from integer to boolean Phase 2 — Lossless reconnect & outbound tracking: - Persist stream cursor after each event for lossless reconnect - Recovery reads persisted cursor for accurate resume position - Add run_outbound_messages table and repository - Add GET /runs/:id/messages endpoint - Add outboundMessages summary to RunStateProjection Phase 3 — Signal & progress enrichment: - Add signalType and severity fields to SendSignalDto - Emit progress.reported for TaskUpdate, TaskComplete, TaskFail Phase 4 — Operational resilience: - Recovery returns batch result summary {recovered, failed} - Durable webhook outbox with delivery tracking table - Distributed recovery locking via PostgreSQL advisory locks - Add 6 new Prometheus counters for observability Phase 5 — New features & scalability: - POST /runs/validate preflight endpoint - Redis StreamHub strategy for horizontal scaling - Streaming JSONL export via async generator - Typed gRPC interfaces to reduce any casts --- drizzle/0006_capabilities_and_stream.sql | 5 + drizzle/0007_canonical_schema_version.sql | 2 + drizzle/0008_webhook_active_boolean.sql | 3 + drizzle/0009_outbound_messages.sql | 21 ++++ drizzle/0010_webhook_deliveries.sql | 18 +++ package-lock.json | 4 +- src/app.module.ts | 4 + src/config/app-config.service.spec.ts | 7 ++ src/config/app-config.service.ts | 13 ++ src/contracts/control-plane.ts | 8 ++ src/contracts/runtime.ts | 1 + src/controllers/run-insights.controller.ts | 21 ++++ src/controllers/runs.controller.spec.ts | 55 +++++++++ src/controllers/runs.controller.ts | 40 ++++++- src/db/database.service.ts | 14 +++ src/db/schema.ts | 57 ++++++++- src/dto/run-responses.dto.ts | 3 + src/dto/send-signal.dto.ts | 10 ++ src/events/event-normalizer.service.spec.ts | 77 ++++++++++-- src/events/event-normalizer.service.ts | 46 ++++--- src/events/redis-stream-hub.strategy.ts | 101 ++++++++++++++++ src/events/run-event.service.spec.ts | 1 + src/events/stream-hub.service.spec.ts | 1 + src/insights/run-insights.service.ts | 55 +++++++++ src/main.ts | 7 +- src/projection/projection.service.spec.ts | 1 + src/projection/projection.service.ts | 35 +++++- src/replay/replay.service.spec.ts | 1 + src/runs/run-executor.service.ts | 111 ++++++++++++++++- src/runs/run-manager.service.spec.ts | 1 + src/runs/run-manager.service.ts | 4 +- src/runs/run-recovery.service.spec.ts | 8 ++ src/runs/run-recovery.service.ts | 37 ++++-- src/runs/stream-consumer.service.ts | 33 +++-- src/runtime/grpc-types.ts | 126 ++++++++++++++++++++ src/runtime/rust-runtime.provider.ts | 10 +- src/storage/event.repository.ts | 21 +++- src/storage/outbound-message.repository.ts | 82 +++++++++++++ src/storage/run.repository.spec.ts | 5 +- src/storage/run.repository.ts | 4 +- src/storage/runtime-session.repository.ts | 21 ++++ src/telemetry/instrumentation.service.ts | 35 ++++++ src/webhooks/webhook-delivery.repository.ts | 73 ++++++++++++ src/webhooks/webhook.repository.ts | 4 +- src/webhooks/webhook.service.ts | 91 ++++++++++---- 45 files changed, 1187 insertions(+), 90 deletions(-) create mode 100644 drizzle/0006_capabilities_and_stream.sql create mode 100644 drizzle/0007_canonical_schema_version.sql create mode 100644 drizzle/0008_webhook_active_boolean.sql create mode 100644 drizzle/0009_outbound_messages.sql create mode 100644 drizzle/0010_webhook_deliveries.sql create mode 100644 src/events/redis-stream-hub.strategy.ts create mode 100644 src/runtime/grpc-types.ts create mode 100644 src/storage/outbound-message.repository.ts create mode 100644 src/webhooks/webhook-delivery.repository.ts diff --git a/drizzle/0006_capabilities_and_stream.sql b/drizzle/0006_capabilities_and_stream.sql new file mode 100644 index 0000000..f1f21a6 --- /dev/null +++ b/drizzle/0006_capabilities_and_stream.sql @@ -0,0 +1,5 @@ +-- P1.3: Add capabilities, stream cursor, and stream timestamps to runtime_sessions +ALTER TABLE "runtime_sessions" ADD COLUMN "capabilities" JSONB NOT NULL DEFAULT '{}'; +ALTER TABLE "runtime_sessions" ADD COLUMN "last_stream_cursor" INTEGER; +ALTER TABLE "runtime_sessions" ADD COLUMN "stream_connected_at" TIMESTAMP WITH TIME ZONE; +ALTER TABLE "runtime_sessions" ADD COLUMN "stream_disconnected_at" TIMESTAMP WITH TIME ZONE; diff --git a/drizzle/0007_canonical_schema_version.sql b/drizzle/0007_canonical_schema_version.sql new file mode 100644 index 0000000..0c369d6 --- /dev/null +++ b/drizzle/0007_canonical_schema_version.sql @@ -0,0 +1,2 @@ +-- P1.4: Add schema_version to canonical events +ALTER TABLE "run_events_canonical" ADD COLUMN "schema_version" INTEGER NOT NULL DEFAULT 3; diff --git a/drizzle/0008_webhook_active_boolean.sql b/drizzle/0008_webhook_active_boolean.sql new file mode 100644 index 0000000..ad78eaf --- /dev/null +++ b/drizzle/0008_webhook_active_boolean.sql @@ -0,0 +1,3 @@ +-- P1.5: Fix webhook active column from integer to boolean +ALTER TABLE "webhooks" ALTER COLUMN "active" TYPE BOOLEAN USING (active = 1); +ALTER TABLE "webhooks" ALTER COLUMN "active" SET DEFAULT true; diff --git a/drizzle/0009_outbound_messages.sql b/drizzle/0009_outbound_messages.sql new file mode 100644 index 0000000..bd1c4dd --- /dev/null +++ b/drizzle/0009_outbound_messages.sql @@ -0,0 +1,21 @@ +-- P2.2: Outbound message tracking table +CREATE TABLE "run_outbound_messages" ( + "id" UUID PRIMARY KEY, + "run_id" UUID NOT NULL REFERENCES "runs"("id") ON DELETE CASCADE, + "runtime_session_id" VARCHAR(255) NOT NULL, + "message_id" VARCHAR(255) NOT NULL, + "message_type" VARCHAR(128) NOT NULL, + "category" VARCHAR(32) NOT NULL, + "sender" VARCHAR(255) NOT NULL, + "recipients" JSONB NOT NULL DEFAULT '[]', + "status" VARCHAR(32) NOT NULL DEFAULT 'queued', + "payload_descriptor" JSONB NOT NULL DEFAULT '{}', + "ack" JSONB, + "error_message" TEXT, + "created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + "accepted_at" TIMESTAMP WITH TIME ZONE, + "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now() +); +CREATE UNIQUE INDEX ON "run_outbound_messages" ("message_id"); +CREATE INDEX ON "run_outbound_messages" ("run_id"); +CREATE INDEX ON "run_outbound_messages" ("status"); diff --git a/drizzle/0010_webhook_deliveries.sql b/drizzle/0010_webhook_deliveries.sql new file mode 100644 index 0000000..ca2357c --- /dev/null +++ b/drizzle/0010_webhook_deliveries.sql @@ -0,0 +1,18 @@ +-- P4.2: Durable webhook delivery tracking +CREATE TABLE "webhook_deliveries" ( + "id" UUID PRIMARY KEY, + "webhook_id" UUID NOT NULL REFERENCES "webhooks"("id") ON DELETE CASCADE, + "event" VARCHAR(128) NOT NULL, + "run_id" UUID NOT NULL, + "payload" JSONB NOT NULL, + "status" VARCHAR(32) NOT NULL DEFAULT 'pending', + "attempts" INTEGER NOT NULL DEFAULT 0, + "last_attempt_at" TIMESTAMP WITH TIME ZONE, + "response_status" INTEGER, + "error_message" TEXT, + "created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + "delivered_at" TIMESTAMP WITH TIME ZONE +); +CREATE INDEX ON "webhook_deliveries" ("webhook_id"); +CREATE INDEX ON "webhook_deliveries" ("status"); +CREATE INDEX ON "webhook_deliveries" ("run_id"); diff --git a/package-lock.json b/package-lock.json index fc3d6a5..70cc442 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "macp-control-plane", - "version": "0.1.0", + "version": "0.3.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "macp-control-plane", - "version": "0.1.0", + "version": "0.3.0", "license": "MIT", "dependencies": { "@grpc/grpc-js": "^1.14.0", diff --git a/src/app.module.ts b/src/app.module.ts index bb14026..3172950 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -32,6 +32,7 @@ import { EventRepository } from './storage/event.repository'; import { ArtifactRepository } from './storage/artifact.repository'; import { MetricsRepository } from './storage/metrics.repository'; import { ProjectionRepository } from './storage/projection.repository'; +import { OutboundMessageRepository } from './storage/outbound-message.repository'; import { RunRepository } from './storage/run.repository'; import { RuntimeSessionRepository } from './storage/runtime-session.repository'; import { InstrumentationService } from './telemetry/instrumentation.service'; @@ -42,6 +43,7 @@ import { RunManagerService } from './runs/run-manager.service'; import { RunRecoveryService } from './runs/run-recovery.service'; import { StreamConsumerService } from './runs/stream-consumer.service'; import { WebhookController } from './controllers/webhook.controller'; +import { WebhookDeliveryRepository } from './webhooks/webhook-delivery.repository'; import { WebhookRepository } from './webhooks/webhook.repository'; import { WebhookService } from './webhooks/webhook.service'; @@ -68,6 +70,7 @@ import { WebhookService } from './webhooks/webhook.service'; ProjectionRepository, ArtifactRepository, MetricsRepository, + OutboundMessageRepository, StreamHubService, EventNormalizerService, ProjectionService, @@ -82,6 +85,7 @@ import { WebhookService } from './webhooks/webhook.service'; RunRecoveryService, RunInsightsService, WebhookRepository, + WebhookDeliveryRepository, WebhookService ] }) diff --git a/src/config/app-config.service.spec.ts b/src/config/app-config.service.spec.ts index f33fed8..222183f 100644 --- a/src/config/app-config.service.spec.ts +++ b/src/config/app-config.service.spec.ts @@ -202,6 +202,13 @@ describe('AppConfigService', () => { }); }); + describe('clientVersion', () => { + it('should read version from package.json', () => { + const config = new AppConfigService(); + expect(config.clientVersion).toBe('0.3.0'); + }); + }); + describe('readNumber edge cases', () => { it('should handle a valid number string for STREAM_BACKOFF_BASE_MS', () => { process.env.STREAM_BACKOFF_BASE_MS = '500'; diff --git a/src/config/app-config.service.ts b/src/config/app-config.service.ts index ff1fc20..41f1aca 100644 --- a/src/config/app-config.service.ts +++ b/src/config/app-config.service.ts @@ -26,6 +26,16 @@ export class AppConfigService implements OnModuleInit { readonly nodeEnv = process.env.NODE_ENV ?? 'development'; readonly isDevelopment = this.nodeEnv === 'development'; + /** Read from package.json at startup */ + readonly clientVersion: string = (() => { + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + return require('../../package.json').version as string; + } catch { + return '0.0.0'; + } + })(); + readonly port = readNumber('PORT', 3001); readonly host = process.env.HOST ?? '0.0.0.0'; readonly corsOrigin = process.env.CORS_ORIGIN ?? 'http://localhost:3000'; @@ -68,6 +78,9 @@ export class AppConfigService implements OnModuleInit { readonly dbPoolIdleTimeout = readNumber('DB_POOL_IDLE_TIMEOUT', 30000); readonly dbPoolConnectionTimeout = readNumber('DB_POOL_CONNECTION_TIMEOUT', 5000); + readonly streamHubStrategy = process.env.STREAM_HUB_STRATEGY ?? 'memory'; + readonly redisUrl = process.env.REDIS_URL ?? ''; + readonly logLevel = process.env.LOG_LEVEL ?? 'info'; readonly otelEnabled = readBoolean('OTEL_ENABLED', false); readonly otelServiceName = process.env.OTEL_SERVICE_NAME ?? 'macp-control-plane'; diff --git a/src/contracts/control-plane.ts b/src/contracts/control-plane.ts index 95df6db..84a1310 100644 --- a/src/contracts/control-plane.ts +++ b/src/contracts/control-plane.ts @@ -236,6 +236,13 @@ export interface ProgressProjection { }>; } +export interface OutboundMessageSummary { + total: number; + queued: number; + accepted: number; + rejected: number; +} + export interface RunStateProjection { run: RunSummaryProjection; participants: ParticipantProjection[]; @@ -245,6 +252,7 @@ export interface RunStateProjection { progress: ProgressProjection; timeline: TimelineProjection; trace: TraceSummary; + outboundMessages: OutboundMessageSummary; } export interface ReplayRequest { diff --git a/src/contracts/runtime.ts b/src/contracts/runtime.ts index 5b2856c..b407ebf 100644 --- a/src/contracts/runtime.ts +++ b/src/contracts/runtime.ts @@ -64,6 +64,7 @@ export interface RuntimeInitializeResult { websiteUrl?: string; }; supportedModes: string[]; + capabilities?: RuntimeCapabilities; } export interface RuntimeStartSessionRequest { diff --git a/src/controllers/run-insights.controller.ts b/src/controllers/run-insights.controller.ts index 85bc2fc..786e233 100644 --- a/src/controllers/run-insights.controller.ts +++ b/src/controllers/run-insights.controller.ts @@ -2,12 +2,15 @@ import { Body, Controller, Get, + Header, Param, ParseUUIDPipe, Post, Query, + Res, ValidationPipe } from '@nestjs/common'; +import type { Response } from 'express'; import { ApiBody, ApiOkResponse, ApiOperation, ApiTags } from '@nestjs/swagger'; import { CompareRunsDto } from '../dto/compare-runs.dto'; import { ExportRunQueryDto } from '../dto/export-run-query.dto'; @@ -45,6 +48,24 @@ export class RunInsightsController { }); } + @Get(':id/export/stream') + @ApiOperation({ summary: 'Stream export as JSONL (newline-delimited JSON).' }) + @Header('Content-Type', 'application/x-ndjson') + async streamExport( + @Param('id', new ParseUUIDPipe()) id: string, + @Query(new ValidationPipe({ transform: true, whitelist: true })) query: ExportRunQueryDto, + @Res() res: Response + ) { + res.setHeader('Content-Type', 'application/x-ndjson'); + res.setHeader('Transfer-Encoding', 'chunked'); + for await (const line of this.insightsService.exportRunStream(id, { + includeRaw: query.includeRaw + })) { + res.write(line); + } + res.end(); + } + @Post('compare') @ApiOperation({ summary: 'Compare two runs side-by-side.' }) @ApiBody({ type: CompareRunsDto }) diff --git a/src/controllers/runs.controller.spec.ts b/src/controllers/runs.controller.spec.ts index 62ef409..3e5516e 100644 --- a/src/controllers/runs.controller.spec.ts +++ b/src/controllers/runs.controller.spec.ts @@ -5,6 +5,8 @@ import { EventRepository } from '../storage/event.repository'; import { ReplayService } from '../replay/replay.service'; import { StreamHubService } from '../events/stream-hub.service'; import { AppConfigService } from '../config/app-config.service'; +import { ProjectionService } from '../projection/projection.service'; +import { OutboundMessageRepository } from '../storage/outbound-message.repository'; describe('RunsController', () => { let controller: RunsController; @@ -12,6 +14,7 @@ describe('RunsController', () => { launch: jest.Mock; cancel: jest.Mock; sendSignal: jest.Mock; + updateContext: jest.Mock; }; let mockRunManager: { listRuns: jest.Mock; @@ -26,12 +29,19 @@ describe('RunsController', () => { let mockReplayService: Partial; let mockStreamHub: Partial; let mockConfig: Partial; + let mockProjectionService: { + rebuild: jest.Mock; + }; + let mockOutboundMessageRepository: { + listByRunId: jest.Mock; + }; beforeEach(() => { mockRunExecutor = { launch: jest.fn(), cancel: jest.fn(), sendSignal: jest.fn(), + updateContext: jest.fn(), }; mockRunManager = { listRuns: jest.fn(), @@ -48,6 +58,12 @@ describe('RunsController', () => { mockConfig = { streamSseHeartbeatMs: 15000, }; + mockProjectionService = { + rebuild: jest.fn(), + }; + mockOutboundMessageRepository = { + listByRunId: jest.fn(), + }; controller = new RunsController( mockRunExecutor as unknown as RunExecutorService, @@ -56,6 +72,8 @@ describe('RunsController', () => { mockReplayService as unknown as ReplayService, mockStreamHub as unknown as StreamHubService, mockConfig as unknown as AppConfigService, + mockProjectionService as unknown as ProjectionService, + mockOutboundMessageRepository as unknown as OutboundMessageRepository, ); }); @@ -249,4 +267,41 @@ describe('RunsController', () => { }); }); + // =========================================================================== + // updateContext + // =========================================================================== + describe('updateContext', () => { + it('delegates to runExecutor.updateContext', async () => { + const contextResult = { messageId: 'msg-ctx', ack: { ok: true } }; + mockRunExecutor.updateContext.mockResolvedValue(contextResult); + + const body = { from: 'agent-1', context: { key: 'value' } }; + const result = await controller.updateContext('run-1', body as any); + + expect(mockRunExecutor.updateContext).toHaveBeenCalledWith('run-1', body); + expect(result).toEqual(contextResult); + }); + }); + + // =========================================================================== + // rebuildProjection + // =========================================================================== + describe('rebuildProjection', () => { + it('fetches events and delegates to projectionService.rebuild', async () => { + const fakeRun = { id: 'run-1', status: 'completed' }; + mockRunManager.getRun.mockResolvedValue(fakeRun); + const events = [{ id: 'e1', seq: 1, type: 'run.created' }]; + mockEventRepository.listCanonicalByRun.mockResolvedValue(events); + const projection = { run: { runId: 'run-1', status: 'completed' } }; + mockProjectionService.rebuild.mockResolvedValue(projection); + + const result = await controller.rebuildProjection('run-1'); + + expect(mockRunManager.getRun).toHaveBeenCalledWith('run-1'); + expect(mockEventRepository.listCanonicalByRun).toHaveBeenCalledWith('run-1', 0, 100000); + expect(mockProjectionService.rebuild).toHaveBeenCalledWith('run-1', events); + expect(result).toEqual(projection); + }); + }); + }); diff --git a/src/controllers/runs.controller.ts b/src/controllers/runs.controller.ts index 41b5706..5bcee1d 100644 --- a/src/controllers/runs.controller.ts +++ b/src/controllers/runs.controller.ts @@ -31,6 +31,9 @@ import { ReplayRequestDto } from '../dto/replay-request.dto'; import { CloneRunDto } from '../dto/clone-run.dto'; import { SendSignalDto } from '../dto/send-signal.dto'; import { StreamRunQueryDto } from '../dto/stream-run-query.dto'; +import { UpdateContextDto } from '../dto/update-context.dto'; +import { ProjectionService } from '../projection/projection.service'; +import { OutboundMessageRepository } from '../storage/outbound-message.repository'; import { CanonicalEventDto, CreateRunResponseDto, @@ -52,9 +55,20 @@ export class RunsController { private readonly eventRepository: EventRepository, private readonly replayService: ReplayService, private readonly streamHub: StreamHubService, - private readonly config: AppConfigService + private readonly config: AppConfigService, + private readonly projectionService: ProjectionService, + private readonly outboundMessageRepository: OutboundMessageRepository ) {} + @Post('validate') + @ApiOperation({ summary: 'Preflight validation of an execution request without creating a run.' }) + @ApiBody({ type: ExecutionRequestDto }) + async validateRequest( + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: ExecutionRequestDto + ) { + return this.runExecutor.validate(body); + } + @Get() @ApiOperation({ summary: 'List runs with optional filtering and pagination.' }) async listRuns( @@ -293,6 +307,30 @@ export class RunsController { return this.runManager.deleteRun(id); } + @Get(':id/messages') + @ApiOperation({ summary: 'List outbound messages for a run.' }) + async getRunMessages(@Param('id', new ParseUUIDPipe()) id: string) { + return this.outboundMessageRepository.listByRunId(id); + } + + @Post(':id/context') + @ApiOperation({ summary: 'Update context during a running session.' }) + @ApiBody({ type: UpdateContextDto }) + async updateContext( + @Param('id', new ParseUUIDPipe()) id: string, + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: UpdateContextDto + ) { + return this.runExecutor.updateContext(id, body); + } + + @Post(':id/projection/rebuild') + @ApiOperation({ summary: 'Rebuild the projection from persisted canonical events.' }) + async rebuildProjection(@Param('id', new ParseUUIDPipe()) id: string) { + await this.runManager.getRun(id); + const events = await this.eventRepository.listCanonicalByRun(id, 0, 100000); + return this.projectionService.rebuild(id, events as any); + } + @Post(':id/archive') @ApiOperation({ summary: 'Archive a run, excluding it from default listings.' }) async archiveRun(@Param('id', new ParseUUIDPipe()) id: string) { diff --git a/src/db/database.service.ts b/src/db/database.service.ts index 9aaae88..dd7f03a 100644 --- a/src/db/database.service.ts +++ b/src/db/database.service.ts @@ -1,4 +1,5 @@ import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; +import { sql } from 'drizzle-orm'; import { drizzle, NodePgDatabase } from 'drizzle-orm/node-postgres'; import { Pool } from 'pg'; import { AppConfigService } from '../config/app-config.service'; @@ -28,4 +29,17 @@ export class DatabaseService implements OnModuleDestroy { async onModuleDestroy(): Promise { await this.pool.end(); } + + async tryAdvisoryLock(key: string): Promise { + const result = await this.db.execute( + sql`SELECT pg_try_advisory_lock(hashtext(${key})) AS acquired` + ); + return (result.rows[0] as { acquired: boolean })?.acquired === true; + } + + async advisoryUnlock(key: string): Promise { + await this.db.execute( + sql`SELECT pg_advisory_unlock(hashtext(${key}))` + ); + } } diff --git a/src/db/schema.ts b/src/db/schema.ts index f6546c7..25dc0e8 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -1,4 +1,5 @@ import { + boolean, index, integer, jsonb, @@ -56,6 +57,10 @@ export const runtimeSessions = pgTable( sessionState: varchar('session_state', { length: 64 }).notNull().default('SESSION_STATE_UNSPECIFIED'), expiresAt: timestamp('expires_at', { withTimezone: true, mode: 'string' }), lastSeenAt: timestamp('last_seen_at', { withTimezone: true, mode: 'string' }), + capabilities: jsonb('capabilities').$type>().notNull().default({}), + lastStreamCursor: integer('last_stream_cursor'), + streamConnectedAt: timestamp('stream_connected_at', { withTimezone: true, mode: 'string' }), + streamDisconnectedAt: timestamp('stream_disconnected_at', { withTimezone: true, mode: 'string' }), metadata: jsonb('metadata').$type>().notNull().default({}), createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow(), updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() @@ -102,6 +107,7 @@ export const runEventsCanonical = pgTable( traceId: varchar('trace_id', { length: 255 }), spanId: varchar('span_id', { length: 255 }), parentSpanId: varchar('parent_span_id', { length: 255 }), + schemaVersion: integer('schema_version').notNull().default(3), data: jsonb('data').$type>().notNull(), createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() }, @@ -193,6 +199,32 @@ export const runMetrics = pgTable( }) ); +export const runOutboundMessages = pgTable( + 'run_outbound_messages', + { + id: uuid('id').primaryKey(), + runId: uuid('run_id').notNull().references(() => runs.id, { onDelete: 'cascade' }), + runtimeSessionId: varchar('runtime_session_id', { length: 255 }).notNull(), + messageId: varchar('message_id', { length: 255 }).notNull(), + messageType: varchar('message_type', { length: 128 }).notNull(), + category: varchar('category', { length: 32 }).notNull(), + sender: varchar('sender', { length: 255 }).notNull(), + recipients: jsonb('recipients').$type().notNull().default([]), + status: varchar('status', { length: 32 }).notNull().default('queued'), + payloadDescriptor: jsonb('payload_descriptor').$type>().notNull().default({}), + ack: jsonb('ack').$type>(), + errorMessage: text('error_message'), + createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow(), + acceptedAt: timestamp('accepted_at', { withTimezone: true, mode: 'string' }), + updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() + }, + (table) => ({ + messageIdIdx: uniqueIndex('run_outbound_messages_message_id_unique').on(table.messageId), + runIdx: index('run_outbound_messages_run_idx').on(table.runId), + statusIdx: index('run_outbound_messages_status_idx').on(table.status) + }) +); + export const webhooks = pgTable( 'webhooks', { @@ -200,7 +232,7 @@ export const webhooks = pgTable( url: text('url').notNull(), events: jsonb('events').$type().notNull().default([]), secret: varchar('secret', { length: 255 }).notNull(), - active: integer('active').notNull().default(1), + active: boolean('active').notNull().default(true), createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow(), updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() }, @@ -208,3 +240,26 @@ export const webhooks = pgTable( activeIdx: index('webhooks_active_idx').on(table.active) }) ); + +export const webhookDeliveries = pgTable( + 'webhook_deliveries', + { + id: uuid('id').primaryKey(), + webhookId: uuid('webhook_id').notNull().references(() => webhooks.id, { onDelete: 'cascade' }), + event: varchar('event', { length: 128 }).notNull(), + runId: uuid('run_id').notNull(), + payload: jsonb('payload').$type>().notNull(), + status: varchar('status', { length: 32 }).notNull().default('pending'), + attempts: integer('attempts').notNull().default(0), + lastAttemptAt: timestamp('last_attempt_at', { withTimezone: true, mode: 'string' }), + responseStatus: integer('response_status'), + errorMessage: text('error_message'), + createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow(), + deliveredAt: timestamp('delivered_at', { withTimezone: true, mode: 'string' }) + }, + (table) => ({ + webhookIdx: index('webhook_deliveries_webhook_idx').on(table.webhookId), + statusIdx: index('webhook_deliveries_status_idx').on(table.status), + runIdx: index('webhook_deliveries_run_idx').on(table.runId) + }) +); diff --git a/src/dto/run-responses.dto.ts b/src/dto/run-responses.dto.ts index c8ff62d..5969acc 100644 --- a/src/dto/run-responses.dto.ts +++ b/src/dto/run-responses.dto.ts @@ -70,6 +70,9 @@ export class RunStateResponseDto implements RunStateProjection { @ApiProperty({ type: 'object', additionalProperties: true }) trace!: RunStateProjection['trace']; + + @ApiProperty({ type: 'object', additionalProperties: true }) + outboundMessages!: RunStateProjection['outboundMessages']; } export class MetricsSummaryDto implements MetricsSummary { diff --git a/src/dto/send-signal.dto.ts b/src/dto/send-signal.dto.ts index 4ff6afd..2801427 100644 --- a/src/dto/send-signal.dto.ts +++ b/src/dto/send-signal.dto.ts @@ -21,4 +21,14 @@ export class SendSignalDto { @IsOptional() @IsObject() payload?: Record; + + @ApiPropertyOptional({ description: 'Signal type classification (e.g., anomaly, alert)' }) + @IsOptional() + @IsString() + signalType?: string; + + @ApiPropertyOptional({ description: 'Signal severity (e.g., low, medium, high, critical)' }) + @IsOptional() + @IsString() + severity?: string; } diff --git a/src/events/event-normalizer.service.spec.ts b/src/events/event-normalizer.service.spec.ts index 933ceba..6f0ef12 100644 --- a/src/events/event-normalizer.service.spec.ts +++ b/src/events/event-normalizer.service.spec.ts @@ -197,7 +197,7 @@ describe('EventNormalizerService', () => { ); }); - it('should produce decision.finalized AND session.state.changed for Commitment messageType', () => { + it('should produce decision.finalized but NOT session.state.changed for Commitment messageType', () => { const decoded = { commitmentId: 'commit-1', action: 'approve' }; protoRegistry.decodeKnown.mockReturnValue(decoded); @@ -215,17 +215,9 @@ describe('EventNormalizerService', () => { expect(decisionFinalized).toBeDefined(); expect(decisionFinalized!.subject).toEqual({ kind: 'decision', id: 'commit-1' }); + // Commitment should NOT synthesize session.state.changed — only runtime authority can do that const stateChanged = events.find((e) => e.type === 'session.state.changed'); - expect(stateChanged).toBeDefined(); - expect(stateChanged!.subject).toEqual({ kind: 'session', id: 'session-1' }); - expect(stateChanged!.data).toEqual( - expect.objectContaining({ - state: 'SESSION_STATE_RESOLVED', - reason: 'Commitment observed on stream', - commitmentId: 'commit-1', - action: 'approve', - }), - ); + expect(stateChanged).toBeUndefined(); }); it('should produce proposal.created derived event for Proposal messageType', () => { @@ -261,6 +253,69 @@ describe('EventNormalizerService', () => { }); }); + describe('progress from task lifecycle', () => { + it('should emit additional progress.reported for TaskUpdate with progress field', () => { + const decoded = { progress: 50, status: 'halfway done' }; + protoRegistry.decodeKnown.mockReturnValue(decoded); + + const envelope = makeEnvelope({ messageType: 'TaskUpdate' }); + const raw: RawRuntimeEvent = { + kind: 'stream-envelope', + receivedAt: '2026-01-01T00:00:00.000Z', + envelope, + }; + const ctx = makeContext({ knownParticipants: new Set(['agent-a']) }); + + const events = service.normalize('run-1', raw, ctx); + + const progressEvents = events.filter((e) => e.type === 'progress.reported'); + expect(progressEvents).toHaveLength(1); + expect(progressEvents[0].data.decodedPayload).toEqual( + expect.objectContaining({ percentage: 50, message: 'halfway done' }), + ); + }); + + it('should emit progress.reported at 100% for TaskComplete', () => { + protoRegistry.decodeKnown.mockReturnValue({}); + + const envelope = makeEnvelope({ messageType: 'TaskComplete' }); + const raw: RawRuntimeEvent = { + kind: 'stream-envelope', + receivedAt: '2026-01-01T00:00:00.000Z', + envelope, + }; + const ctx = makeContext({ knownParticipants: new Set(['agent-a']) }); + + const events = service.normalize('run-1', raw, ctx); + + const progressEvents = events.filter((e) => e.type === 'progress.reported'); + expect(progressEvents).toHaveLength(1); + expect(progressEvents[0].data.decodedPayload).toEqual( + expect.objectContaining({ percentage: 100, message: 'completed' }), + ); + }); + + it('should emit progress.reported with failure status for TaskFail', () => { + protoRegistry.decodeKnown.mockReturnValue({ reason: 'out of memory' }); + + const envelope = makeEnvelope({ messageType: 'TaskFail' }); + const raw: RawRuntimeEvent = { + kind: 'stream-envelope', + receivedAt: '2026-01-01T00:00:00.000Z', + envelope, + }; + const ctx = makeContext({ knownParticipants: new Set(['agent-a']) }); + + const events = service.normalize('run-1', raw, ctx); + + const progressEvents = events.filter((e) => e.type === 'progress.reported'); + expect(progressEvents).toHaveLength(1); + expect(progressEvents[0].data.decodedPayload).toEqual( + expect.objectContaining({ message: 'out of memory' }), + ); + }); + }); + describe('unknown event kinds', () => { it('should produce message.sent for send-ack kind', () => { const raw: RawRuntimeEvent = { diff --git a/src/events/event-normalizer.service.ts b/src/events/event-normalizer.service.ts index 535e32a..3b86a60 100644 --- a/src/events/event-normalizer.service.ts +++ b/src/events/event-normalizer.service.ts @@ -127,22 +127,38 @@ export class EventNormalizerService implements EventNormalizer { ); } - if (envelope.messageType === 'Commitment') { - const commitment = decoded ?? {}; + // Emit additional progress.reported for TaskUpdate/TaskComplete/TaskFail + if (envelope.messageType === 'TaskUpdate' && decoded) { + const progress = (decoded as Record).progress; + if (progress !== undefined) { + canonical.push( + this.makeEvent(runId, ts, 'progress.reported', { kind: 'message', id: envelope.messageId }, { + modeName: envelope.mode, + messageType: envelope.messageType, + sender: envelope.sender, + decodedPayload: { percentage: progress, message: (decoded as Record).status ?? '' } + }, envelope.messageType) + ); + } + } + if (envelope.messageType === 'TaskComplete') { canonical.push( - this.makeEvent( - runId, - ts, - 'session.state.changed', - { kind: 'session', id: envelope.sessionId }, - { - state: 'SESSION_STATE_RESOLVED', - reason: 'Commitment observed on stream', - commitmentId: commitment.commitmentId, - action: commitment.action - }, - envelope.messageType - ) + this.makeEvent(runId, ts, 'progress.reported', { kind: 'message', id: envelope.messageId }, { + modeName: envelope.mode, + messageType: envelope.messageType, + sender: envelope.sender, + decodedPayload: { percentage: 100, message: 'completed' } + }, envelope.messageType) + ); + } + if (envelope.messageType === 'TaskFail') { + canonical.push( + this.makeEvent(runId, ts, 'progress.reported', { kind: 'message', id: envelope.messageId }, { + modeName: envelope.mode, + messageType: envelope.messageType, + sender: envelope.sender, + decodedPayload: { percentage: undefined, message: (decoded as Record | undefined)?.reason ?? 'failed' } + }, envelope.messageType) ); } diff --git a/src/events/redis-stream-hub.strategy.ts b/src/events/redis-stream-hub.strategy.ts new file mode 100644 index 0000000..9f11606 --- /dev/null +++ b/src/events/redis-stream-hub.strategy.ts @@ -0,0 +1,101 @@ +import { Logger } from '@nestjs/common'; +import { Observable, Subject } from 'rxjs'; +import { filter } from 'rxjs/operators'; +import { CanonicalEvent, RunStateProjection } from '../contracts/control-plane'; +import { StreamHubStrategy } from './stream-hub.interface'; +import { StreamHubMessage } from './stream-hub.service'; + +interface RedisHubMessage extends StreamHubMessage { + _runId: string; +} + +/** + * Redis pub/sub StreamHub strategy for horizontal scaling. + * Publishes events to a Redis channel and subscribes to receive + * events from other control-plane instances. + * + * Requires `ioredis` as an optional peer dependency. + */ +export class RedisStreamHubStrategy implements StreamHubStrategy { + private readonly logger = new Logger(RedisStreamHubStrategy.name); + private readonly localSubject = new Subject(); + private publisher: { publish: (channel: string, message: string) => Promise } | null = null; + private subscriber: { subscribe: (channel: string, cb?: (err: Error | null) => void) => void; on: (event: string, cb: (...args: unknown[]) => void) => void } | null = null; + private readonly channel = 'macp:stream-hub'; + + constructor(redisUrl: string) { + void this.connect(redisUrl); + } + + private async connect(redisUrl: string): Promise { + try { + // Dynamic import — ioredis is an optional peer dependency + // eslint-disable-next-line @typescript-eslint/no-require-imports + const Redis = require('ioredis'); + this.publisher = new Redis(redisUrl); + this.subscriber = new Redis(redisUrl); + + this.subscriber!.subscribe(this.channel, (err: Error | null) => { + if (err) { + this.logger.error(`Failed to subscribe to Redis channel: ${err.message}`); + } else { + this.logger.log('Connected to Redis stream hub'); + } + }); + + this.subscriber!.on('message', (_channel: unknown, message: unknown) => { + try { + const parsed = JSON.parse(message as string) as RedisHubMessage; + this.localSubject.next(parsed); + } catch (err) { + this.logger.warn(`Failed to parse Redis message: ${err instanceof Error ? err.message : String(err)}`); + } + }); + } catch (err) { + this.logger.error(`Failed to connect to Redis: ${err instanceof Error ? err.message : String(err)}`); + } + } + + publishEvent(event: CanonicalEvent): void { + const msg: RedisHubMessage = { + _runId: event.runId, + event: 'canonical_event', + data: event + }; + this.publish(msg); + } + + publishSnapshot(runId: string, snapshot: RunStateProjection): void { + const msg: RedisHubMessage = { + _runId: runId, + event: 'snapshot', + data: snapshot + }; + this.publish(msg); + } + + complete(runId: string): void { + const msg: RedisHubMessage = { + _runId: runId, + event: 'complete', + data: { runId } + }; + this.publish(msg); + } + + stream(runId: string): Observable { + return this.localSubject.asObservable().pipe( + filter((msg) => msg._runId === runId) + ); + } + + private publish(msg: RedisHubMessage): void { + if (this.publisher) { + this.publisher.publish(this.channel, JSON.stringify(msg)).catch((err: Error) => { + this.logger.warn(`Failed to publish to Redis: ${err.message}`); + }); + } + // Also emit locally for same-instance subscribers + this.localSubject.next(msg); + } +} diff --git a/src/events/run-event.service.spec.ts b/src/events/run-event.service.spec.ts index 9423ac6..3320cf8 100644 --- a/src/events/run-event.service.spec.ts +++ b/src/events/run-event.service.spec.ts @@ -27,6 +27,7 @@ describe('RunEventService', () => { progress: { entries: [] }, timeline: { latestSeq: 1, totalEvents: 1, recent: [] }, trace: { spanCount: 0, linkedArtifacts: [] }, + outboundMessages: { total: 0, queued: 0, accepted: 0, rejected: 0 }, }; beforeEach(() => { diff --git a/src/events/stream-hub.service.spec.ts b/src/events/stream-hub.service.spec.ts index 3e4e322..b55f911 100644 --- a/src/events/stream-hub.service.spec.ts +++ b/src/events/stream-hub.service.spec.ts @@ -36,6 +36,7 @@ describe('StreamHubService', () => { progress: { entries: [] }, timeline: { latestSeq: 0, totalEvents: 0, recent: [] }, trace: { spanCount: 0, linkedArtifacts: [] }, + outboundMessages: { total: 0, queued: 0, accepted: 0, rejected: 0 }, }); describe('publishEvent()', () => { diff --git a/src/insights/run-insights.service.ts b/src/insights/run-insights.service.ts index 3149735..b42427c 100644 --- a/src/insights/run-insights.service.ts +++ b/src/insights/run-insights.service.ts @@ -113,6 +113,61 @@ export class RunInsightsService { return lines.join('\n') + '\n'; } + async *exportRunStream( + runId: string, + options: { includeRaw?: boolean } + ): AsyncGenerator { + const run = await this.runRepository.findById(runId); + if (!run) throw new NotFoundException(`run ${runId} not found`); + + const [session, projection, metrics, artifacts] = await Promise.all([ + this.runtimeSessionRepository.findByRunId(runId), + this.projectionService.get(runId), + this.metricsRepository.get(runId), + this.artifactRepository.listByRunId(runId) + ]); + + // Emit header line + yield JSON.stringify({ + type: 'header', + run: this.toRun(run), + session: session as Record | null, + projection, + metrics: metrics ? { + runId: metrics.runId, + eventCount: metrics.eventCount, + messageCount: metrics.messageCount, + signalCount: metrics.signalCount, + proposalCount: metrics.proposalCount, + toolCallCount: metrics.toolCallCount, + decisionCount: metrics.decisionCount, + streamReconnectCount: metrics.streamReconnectCount, + firstEventAt: metrics.firstEventAt ?? undefined, + lastEventAt: metrics.lastEventAt ?? undefined, + durationMs: metrics.durationMs ?? undefined, + sessionState: (metrics.sessionState as MetricsSummary['sessionState']) ?? undefined + } : null, + artifacts: artifacts.map((a) => ({ + id: a.id, runId: a.runId, kind: a.kind, label: a.label, + uri: a.uri ?? undefined, inline: a.inline ?? undefined, createdAt: a.createdAt + })), + exportedAt: new Date().toISOString() + }) + '\n'; + + // Stream canonical events + for await (const event of this.eventRepository.streamCanonicalByRun(runId)) { + yield JSON.stringify({ ...event, type: 'canonical_event' }) + '\n'; + } + + // Stream raw events if requested + if (options.includeRaw) { + const rawEvents = await this.eventRepository.listRawByRun(runId, 0, 100000); + for (const event of rawEvents) { + yield JSON.stringify({ ...event, type: 'raw_event' }) + '\n'; + } + } + } + async compareRuns(leftRunId: string, rightRunId: string): Promise { const [leftRun, rightRun] = await Promise.all([ this.runRepository.findById(leftRunId), diff --git a/src/main.ts b/src/main.ts index 1403946..a047b26 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,5 +1,5 @@ import 'reflect-metadata'; -import { ValidationPipe } from '@nestjs/common'; +import { Logger, ValidationPipe } from '@nestjs/common'; import { NestFactory } from '@nestjs/core'; import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger'; import * as express from 'express'; @@ -52,6 +52,9 @@ async function bootstrap() { } bootstrap().catch((err) => { - console.error('bootstrap failed', err); + new Logger('Bootstrap').error( + `bootstrap failed: ${err instanceof Error ? err.message : String(err)}`, + err instanceof Error ? err.stack : undefined + ); process.exit(1); }); diff --git a/src/projection/projection.service.spec.ts b/src/projection/projection.service.spec.ts index 1acd64e..36cf30f 100644 --- a/src/projection/projection.service.spec.ts +++ b/src/projection/projection.service.spec.ts @@ -58,6 +58,7 @@ describe('ProjectionService', () => { progress: { entries: [] }, timeline: { latestSeq: 0, totalEvents: 0, recent: [] }, trace: { spanCount: 0, linkedArtifacts: [] }, + outboundMessages: { total: 0, queued: 0, accepted: 0, rejected: 0 }, }); }); }); diff --git a/src/projection/projection.service.ts b/src/projection/projection.service.ts index c80785f..58170fc 100644 --- a/src/projection/projection.service.ts +++ b/src/projection/projection.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { CanonicalEvent, GraphProjection, + OutboundMessageSummary, ParticipantProjection, ProgressProjection, RunStateProjection, @@ -28,7 +29,8 @@ export class ProjectionService { signals: row.signals as unknown as RunStateProjection['signals'], progress: row.progress as unknown as ProgressProjection ?? { entries: [] }, timeline: row.timeline as unknown as RunStateProjection['timeline'], - trace: row.traceSummary as unknown as RunStateProjection['trace'] + trace: row.traceSummary as unknown as RunStateProjection['trace'], + outboundMessages: (row as any).outboundMessages as OutboundMessageSummary ?? { total: 0, queued: 0, accepted: 0, rejected: 0 } }; } @@ -103,7 +105,33 @@ export class ProjectionService { } break; } - case 'message.sent': + case 'message.sent': { + const sender = String(event.data.sender ?? event.data.from ?? ''); + const recipients = (event.data.to as string[] | undefined) ?? []; + this.touchParticipant(next, sender, event.ts, 'active', String(event.data.messageType ?? event.type)); + recipients.forEach((recipient) => this.touchParticipant(next, recipient, event.ts, 'waiting', undefined)); + recipients.forEach((recipient) => { + if (sender && recipient) { + next.graph.edges.push({ from: sender, to: recipient, kind: event.type, ts: event.ts }); + } + }); + next.graph.edges = next.graph.edges.slice(-200); + // Track outbound message stats + if (!next.outboundMessages) { + next.outboundMessages = { total: 0, queued: 0, accepted: 0, rejected: 0 }; + } + next.outboundMessages.total += 1; + next.outboundMessages.accepted += 1; + break; + } + case 'message.send_failed': { + if (!next.outboundMessages) { + next.outboundMessages = { total: 0, queued: 0, accepted: 0, rejected: 0 }; + } + next.outboundMessages.total += 1; + next.outboundMessages.rejected += 1; + break; + } case 'message.received': { const sender = String(event.data.sender ?? event.data.from ?? ''); const recipients = (event.data.to as string[] | undefined) ?? []; @@ -203,7 +231,8 @@ export class ProjectionService { signals: { signals: [] }, progress: { entries: [] }, timeline: { latestSeq: 0, totalEvents: 0, recent: [] }, - trace: { spanCount: 0, linkedArtifacts: [] } + trace: { spanCount: 0, linkedArtifacts: [] }, + outboundMessages: { total: 0, queued: 0, accepted: 0, rejected: 0 } }; } diff --git a/src/replay/replay.service.spec.ts b/src/replay/replay.service.spec.ts index a67fc58..408af24 100644 --- a/src/replay/replay.service.spec.ts +++ b/src/replay/replay.service.spec.ts @@ -233,6 +233,7 @@ describe('ReplayService', () => { progress: { entries: [] }, timeline: { latestSeq: 2, totalEvents: 2, recent: [] }, trace: { spanCount: 0, linkedArtifacts: [] }, + outboundMessages: { total: 0, queued: 0, accepted: 0, rejected: 0 }, }; mockProjectionService.replayStateAt.mockResolvedValue(fakeProjection); diff --git a/src/runs/run-executor.service.ts b/src/runs/run-executor.service.ts index 4a92cab..92763d2 100644 --- a/src/runs/run-executor.service.ts +++ b/src/runs/run-executor.service.ts @@ -33,6 +33,67 @@ export class RunExecutorService { private readonly config: AppConfigService ) {} + async validate(request: ExecutionRequest) { + const errors: string[] = []; + const warnings: string[] = []; + + if (!request.session.participants || request.session.participants.length === 0) { + errors.push('session.participants must contain at least one participant'); + } + + if (!request.session.modeName) { + errors.push('session.modeName is required'); + } + + if (request.kickoff) { + for (const msg of request.kickoff) { + if (!msg.messageType) { + errors.push('kickoff message is missing messageType'); + } + if (!msg.from) { + errors.push('kickoff message is missing from'); + } + } + } + + let runtimeInfo: { reachable: boolean; supportedModes: string[]; capabilities?: unknown } = { + reachable: false, + supportedModes: [] + }; + + try { + const provider = this.runtimeRegistry.get(request.runtime.kind); + const deadlineMs = this.config.runtimeRequestTimeoutMs; + const initResult = await provider.initialize( + { clientName: 'macp-control-plane', clientVersion: this.config.clientVersion }, + { deadline: new Date(Date.now() + deadlineMs) } + ); + runtimeInfo = { + reachable: true, + supportedModes: initResult.supportedModes, + capabilities: initResult.capabilities + }; + + if ( + initResult.supportedModes.length > 0 && + !initResult.supportedModes.includes(request.session.modeName) + ) { + errors.push( + `Runtime does not support mode '${request.session.modeName}'. Supported: ${initResult.supportedModes.join(', ')}` + ); + } + } catch (error) { + warnings.push(`Runtime not reachable: ${error instanceof Error ? error.message : String(error)}`); + } + + return { + valid: errors.length === 0, + errors, + warnings, + runtime: runtimeInfo + }; + } + async launch(request: ExecutionRequest) { if (request.mode === 'replay') { throw new BadRequestException('Use /runs/:id/replay for replay mode. POST /runs launches live or sandbox executions.'); @@ -116,6 +177,52 @@ export class RunExecutorService { return { messageId: sendResult.envelope.messageId, ack: sendResult.ack }; } + async updateContext(runId: string, dto: { from: string; context: Record }) { + const run = await this.runManager.getRun(runId); + if (!run.runtimeSessionId || run.status !== 'running') { + throw new BadRequestException('run is not in running state'); + } + const provider = this.runtimeRegistry.get(run.runtimeKind); + + const sendResult = await provider.send({ + runId, + runtimeSessionId: '', + modeName: '', + from: dto.from, + to: [], + messageType: 'ContextUpdate', + payload: Buffer.from(JSON.stringify(dto.context), 'utf8'), + payloadDescriptor: dto.context + }); + + if (!sendResult.ack.ok && sendResult.ack.error) { + throw new AppException( + ErrorCode.CONTEXT_UPDATE_FAILED, + `Runtime rejected context update: [${sendResult.ack.error.code}] ${sendResult.ack.error.message}`, + 502 + ); + } + + await this.eventService.emitControlPlaneEvents(runId, [ + { + ts: new Date().toISOString(), + type: 'message.sent', + source: { kind: 'control-plane', name: 'run-executor' }, + subject: { kind: 'message', id: sendResult.envelope.messageId }, + data: { + sessionId: run.runtimeSessionId, + sender: dto.from, + to: [], + messageType: 'ContextUpdate', + ack: sendResult.ack, + payloadDescriptor: dto.context + } + } + ]); + + return { messageId: sendResult.envelope.messageId, ack: sendResult.ack }; + } + async clone(runId: string, overrides?: { tags?: string[]; context?: Record }) { const run = await this.runManager.getRun(runId); const executionRequest = run.metadata?.executionRequest as ExecutionRequest | undefined; @@ -154,7 +261,7 @@ export class RunExecutorService { }, async () => { return provider.initialize( - { clientName: 'macp-control-plane', clientVersion: '0.2.0' }, + { clientName: 'macp-control-plane', clientVersion: this.config.clientVersion }, { deadline: new Date(Date.now() + deadlineMs) } ); } @@ -185,7 +292,7 @@ export class RunExecutorService { async () => handle.sessionAck ); - await this.runManager.bindSession(runId, request, session); + await this.runManager.bindSession(runId, request, session, initResult.capabilities as unknown as Record); // Send kickoff messages through the bidirectional stream for (const message of request.kickoff ?? []) { diff --git a/src/runs/run-manager.service.spec.ts b/src/runs/run-manager.service.spec.ts index 4274a7c..43f8f8e 100644 --- a/src/runs/run-manager.service.spec.ts +++ b/src/runs/run-manager.service.spec.ts @@ -51,6 +51,7 @@ function makeEmptyProjection(runId: string): RunStateProjection { progress: { entries: [] }, timeline: { latestSeq: 0, totalEvents: 0, recent: [] }, trace: { spanCount: 0, linkedArtifacts: [] }, + outboundMessages: { total: 0, queued: 0, accepted: 0, rejected: 0 }, }; } diff --git a/src/runs/run-manager.service.ts b/src/runs/run-manager.service.ts index 4a3f06c..465fd7c 100644 --- a/src/runs/run-manager.service.ts +++ b/src/runs/run-manager.service.ts @@ -102,7 +102,8 @@ export class RunManagerService { async bindSession( runId: string, request: ExecutionRequest, - session: { runtimeSessionId: string; initiator: string; ack: { sessionState: string } } + session: { runtimeSessionId: string; initiator: string; ack: { sessionState: string } }, + capabilities?: Record ) { const run = await this.runRepository.update(runId, { status: 'binding_session', @@ -119,6 +120,7 @@ export class RunManagerService { initiatorParticipantId: session.initiator, sessionState: session.ack.sessionState, lastSeenAt: new Date().toISOString(), + capabilities: (capabilities ?? {}) as Record, metadata: { participants: request.session.participants, roots: request.session.roots ?? [] diff --git a/src/runs/run-recovery.service.spec.ts b/src/runs/run-recovery.service.spec.ts index 117b9e8..24391df 100644 --- a/src/runs/run-recovery.service.spec.ts +++ b/src/runs/run-recovery.service.spec.ts @@ -1,5 +1,6 @@ import { RunRecoveryService } from './run-recovery.service'; import { AppConfigService } from '../config/app-config.service'; +import { DatabaseService } from '../db/database.service'; import { RunEventService } from '../events/run-event.service'; import { RunRepository } from '../storage/run.repository'; import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; @@ -9,6 +10,7 @@ import { StreamConsumerService } from './stream-consumer.service'; describe('RunRecoveryService', () => { let service: RunRecoveryService; let mockConfig: Partial; + let mockDatabase: { tryAdvisoryLock: jest.Mock; advisoryUnlock: jest.Mock }; let mockRunRepo: { listActiveRuns: jest.Mock }; let mockSessionRepo: { findByRunId: jest.Mock }; let mockRunManager: { markRunning: jest.Mock; markFailed: jest.Mock }; @@ -17,6 +19,10 @@ describe('RunRecoveryService', () => { beforeEach(() => { mockConfig = { runRecoveryEnabled: true }; + mockDatabase = { + tryAdvisoryLock: jest.fn().mockResolvedValue(true), + advisoryUnlock: jest.fn().mockResolvedValue(undefined) + }; mockRunRepo = { listActiveRuns: jest.fn().mockResolvedValue([]) }; mockSessionRepo = { findByRunId: jest.fn().mockResolvedValue(null) }; mockRunManager = { @@ -28,6 +34,7 @@ describe('RunRecoveryService', () => { service = new RunRecoveryService( mockConfig as AppConfigService, + mockDatabase as unknown as DatabaseService, mockRunRepo as unknown as RunRepository, mockSessionRepo as unknown as RuntimeSessionRepository, mockRunManager as unknown as RunManagerService, @@ -39,6 +46,7 @@ describe('RunRecoveryService', () => { it('skips recovery when disabled', async () => { const disabledService = new RunRecoveryService( { runRecoveryEnabled: false } as AppConfigService, + mockDatabase as unknown as DatabaseService, mockRunRepo as unknown as RunRepository, mockSessionRepo as unknown as RuntimeSessionRepository, mockRunManager as unknown as RunManagerService, diff --git a/src/runs/run-recovery.service.ts b/src/runs/run-recovery.service.ts index 48ecc71..c0eef5c 100644 --- a/src/runs/run-recovery.service.ts +++ b/src/runs/run-recovery.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; import { ExecutionRequest } from '../contracts/control-plane'; import { AppConfigService } from '../config/app-config.service'; +import { DatabaseService } from '../db/database.service'; import { RunEventService } from '../events/run-event.service'; import { RunRepository } from '../storage/run.repository'; import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; @@ -13,6 +14,7 @@ export class RunRecoveryService implements OnApplicationBootstrap { constructor( private readonly config: AppConfigService, + private readonly database: DatabaseService, private readonly runRepository: RunRepository, private readonly runtimeSessionRepository: RuntimeSessionRepository, private readonly runManager: RunManagerService, @@ -28,21 +30,36 @@ export class RunRecoveryService implements OnApplicationBootstrap { await this.recoverActiveRuns(); } - async recoverActiveRuns(): Promise { + async recoverActiveRuns(): Promise<{ recovered: string[]; failed: Array<{ runId: string; error: string }> }> { const activeRuns = await this.runRepository.listActiveRuns(); if (activeRuns.length === 0) { this.logger.log('no active runs to recover'); - return; + return { recovered: [], failed: [] }; } this.logger.log(`recovering ${activeRuns.length} active run(s)`); + const recovered: string[] = []; + const failed: Array<{ runId: string; error: string }> = []; + for (const run of activeRuns) { try { - await this.recoverRun(run); + // Distributed lock: prevent multiple CP instances from recovering the same run + const lockKey = `run-recovery:${run.id}`; + const acquired = await this.database.tryAdvisoryLock(lockKey); + if (!acquired) { + this.logger.log(`skipping run ${run.id} — another instance holds the recovery lock`); + continue; + } + try { + await this.recoverRun(run); + recovered.push(run.id); + } finally { + await this.database.advisoryUnlock(lockKey); + } } catch (error) { - this.logger.error( - `failed to recover run ${run.id}: ${error instanceof Error ? error.message : String(error)}` - ); + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`failed to recover run ${run.id}: ${errorMessage}`); + failed.push({ runId: run.id, error: errorMessage }); try { await this.runManager.markFailed(run.id, error); } catch (markError) { @@ -52,6 +69,9 @@ export class RunRecoveryService implements OnApplicationBootstrap { } } } + + this.logger.log(`recovery summary: ${recovered.length} recovered, ${failed.length} failed`); + return { recovered, failed }; } private async recoverRun(run: { @@ -94,13 +114,16 @@ export class RunRecoveryService implements OnApplicationBootstrap { } ]); + // Use the persisted stream cursor if available, otherwise fall back to run's lastEventSeq + const resumeFromSeq = Math.max(session?.lastStreamCursor ?? 0, run.lastEventSeq); + await this.streamConsumer.start({ runId: run.id, execution: executionRequest, runtimeKind: run.runtimeKind, runtimeSessionId, subscriberId, - resumeFromSeq: run.lastEventSeq + resumeFromSeq }); this.logger.log(`recovered run ${run.id} from seq ${run.lastEventSeq}`); diff --git a/src/runs/stream-consumer.service.ts b/src/runs/stream-consumer.service.ts index d93dcf3..97b6827 100644 --- a/src/runs/stream-consumer.service.ts +++ b/src/runs/stream-consumer.service.ts @@ -14,6 +14,7 @@ interface ActiveStream { finalized: boolean; connected: boolean; lastProcessedSeq: number; + finalizingPromise?: Promise; } @Injectable() @@ -80,14 +81,22 @@ export class StreamConsumerService implements OnModuleDestroy { error?: unknown ): Promise { if (marker.finalized) return; - marker.finalized = true; - marker.aborted = true; - if (status === 'completed') { - await this.runManager.markCompleted(runId); - } else { - await this.runManager.markFailed(runId, error ?? new Error('unknown failure')); + if (marker.finalizingPromise) { + await marker.finalizingPromise; + return; } - this.streamHub.complete(runId); + const doFinalize = async () => { + marker.finalized = true; + marker.aborted = true; + if (status === 'completed') { + await this.runManager.markCompleted(runId); + } else { + await this.runManager.markFailed(runId, error ?? new Error('unknown failure')); + } + this.streamHub.complete(runId); + }; + marker.finalizingPromise = doFinalize(); + await marker.finalizingPromise; } private backoffMs(retries: number): number { @@ -279,6 +288,11 @@ export class StreamConsumerService implements OnModuleDestroy { marker.lastProcessedSeq = event.seq; } + // Persist stream cursor for lossless reconnect + if (marker.lastProcessedSeq > 0) { + await this.runtimeSessionRepository.updateStreamCursor(runId, marker.lastProcessedSeq); + } + const sessionStateChange = emitted.find((event) => event.type === 'session.state.changed'); if (sessionStateChange && typeof sessionStateChange.data.state === 'string') { await this.runtimeSessionRepository.updateState( @@ -296,10 +310,5 @@ export class StreamConsumerService implements OnModuleDestroy { } } - const finalized = emitted.find((event) => event.type === 'decision.finalized'); - if (finalized) { - await this.runtimeSessionRepository.updateState(runId, 'SESSION_STATE_RESOLVED', new Date().toISOString()); - await this.finalizeRun(runId, marker, 'completed'); - } } } diff --git a/src/runtime/grpc-types.ts b/src/runtime/grpc-types.ts new file mode 100644 index 0000000..8c19533 --- /dev/null +++ b/src/runtime/grpc-types.ts @@ -0,0 +1,126 @@ +/** + * TypeScript interfaces for gRPC request/response shapes. + * Reduces `any` casts in RustRuntimeProvider. + */ + +export interface GrpcClientInfo { + name: string; + title?: string; + version?: string; + description?: string; + websiteUrl?: string; +} + +export interface GrpcCapabilities { + sessions?: { stream?: boolean }; + cancellation?: { cancelSession?: boolean }; + progress?: { progress?: boolean }; + manifest?: { getManifest?: boolean }; + modeRegistry?: { listModes?: boolean; listChanged?: boolean }; + roots?: { listRoots?: boolean; listChanged?: boolean }; + experimental?: { features?: Record }; +} + +export interface GrpcInitializeRequest { + supportedProtocolVersions: string[]; + clientInfo: GrpcClientInfo; + capabilities: GrpcCapabilities; +} + +export interface GrpcInitializeResponse { + selectedProtocolVersion: string; + runtimeInfo?: GrpcClientInfo; + supportedModes?: string[]; + capabilities?: GrpcCapabilities; +} + +export interface GrpcEnvelope { + macpVersion: string; + mode: string; + messageType: string; + messageId: string; + sessionId: string; + sender: string; + timestampUnixMs: string; + payload: Buffer; +} + +export interface GrpcAck { + ok?: boolean; + duplicate?: boolean; + messageId?: string; + sessionId?: string; + acceptedAtUnixMs?: string; + sessionState?: string; + error?: { + code: string; + message: string; + sessionId?: string; + messageId?: string; + details?: Buffer; + }; +} + +export interface GrpcSendResponse { + ack: GrpcAck; +} + +export interface GrpcStreamChunk { + envelope: GrpcEnvelope; +} + +export interface GrpcSessionMetadata { + sessionId?: string; + mode?: string; + state?: string; + startedAtUnixMs?: string; + expiresAtUnixMs?: string; + modeVersion?: string; + configurationVersion?: string; + policyVersion?: string; +} + +export interface GrpcGetSessionResponse { + metadata: GrpcSessionMetadata; +} + +export interface GrpcCancelSessionResponse { + ack: GrpcAck; +} + +export interface GrpcManifest { + agentId?: string; + title?: string; + description?: string; + supportedModes?: string[]; + metadata?: Record; +} + +export interface GrpcGetManifestResponse { + manifest?: GrpcManifest; +} + +export interface GrpcModeDescriptor { + mode: string; + modeVersion: string; + title?: string; + description?: string; + determinismClass?: string; + participantModel?: string; + messageTypes?: string[]; + terminalMessageTypes?: string[]; + schemaUris?: Record; +} + +export interface GrpcListModesResponse { + modes?: GrpcModeDescriptor[]; +} + +export interface GrpcRootDescriptor { + uri: string; + name?: string; +} + +export interface GrpcListRootsResponse { + roots?: GrpcRootDescriptor[]; +} diff --git a/src/runtime/rust-runtime.provider.ts b/src/runtime/rust-runtime.provider.ts index 546d508..dfd7f3f 100644 --- a/src/runtime/rust-runtime.provider.ts +++ b/src/runtime/rust-runtime.provider.ts @@ -118,7 +118,15 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { description: response.runtimeInfo?.description, websiteUrl: response.runtimeInfo?.websiteUrl }, - supportedModes: response.supportedModes ?? [] + supportedModes: response.supportedModes ?? [], + capabilities: response.capabilities ? { + sessions: response.capabilities.sessions, + cancellation: response.capabilities.cancellation, + progress: response.capabilities.progress, + manifest: response.capabilities.manifest, + modeRegistry: response.capabilities.modeRegistry, + roots: response.capabilities.roots + } : undefined }; } diff --git a/src/storage/event.repository.ts b/src/storage/event.repository.ts index 6acf23f..f468acb 100644 --- a/src/storage/event.repository.ts +++ b/src/storage/event.repository.ts @@ -45,7 +45,8 @@ export class EventRepository { traceId: event.trace?.traceId, spanId: event.trace?.spanId, parentSpanId: event.trace?.parentSpanId, - data: event.data + data: event.data, + schemaVersion: event.schemaVersion ?? 3 })) ).onConflictDoNothing(); } @@ -83,6 +84,24 @@ export class EventRepository { .limit(limit); } + async *streamCanonicalByRun(runId: string, afterSeq = 0, batchSize = 500): AsyncGenerator { + let cursor = afterSeq; + while (true) { + const batch = await this.database.db + .select() + .from(runEventsCanonical) + .where(and(eq(runEventsCanonical.runId, runId), gt(runEventsCanonical.seq, cursor))) + .orderBy(asc(runEventsCanonical.seq)) + .limit(batchSize); + if (batch.length === 0) return; + for (const event of batch) { + yield event; + } + cursor = batch[batch.length - 1].seq; + if (batch.length < batchSize) return; + } + } + async listCanonicalUpTo(runId: string, seq?: number) { const where = seq === undefined ? eq(runEventsCanonical.runId, runId) diff --git a/src/storage/outbound-message.repository.ts b/src/storage/outbound-message.repository.ts new file mode 100644 index 0000000..46b253a --- /dev/null +++ b/src/storage/outbound-message.repository.ts @@ -0,0 +1,82 @@ +import { Injectable } from '@nestjs/common'; +import { and, asc, eq } from 'drizzle-orm'; +import { randomUUID } from 'node:crypto'; +import { DatabaseService } from '../db/database.service'; +import { runOutboundMessages } from '../db/schema'; + +export interface NewOutboundMessage { + runId: string; + runtimeSessionId: string; + messageId: string; + messageType: string; + category: 'kickoff' | 'signal' | 'context_update'; + sender: string; + recipients: string[]; + payloadDescriptor?: Record; +} + +@Injectable() +export class OutboundMessageRepository { + constructor(private readonly database: DatabaseService) {} + + async create(input: NewOutboundMessage) { + const id = randomUUID(); + const now = new Date().toISOString(); + await this.database.db.insert(runOutboundMessages).values({ + id, + runId: input.runId, + runtimeSessionId: input.runtimeSessionId, + messageId: input.messageId, + messageType: input.messageType, + category: input.category, + sender: input.sender, + recipients: input.recipients, + status: 'queued', + payloadDescriptor: input.payloadDescriptor ?? {}, + createdAt: now, + updatedAt: now + }); + return this.findByMessageId(input.messageId); + } + + async markAccepted(messageId: string, ack: Record) { + const now = new Date().toISOString(); + await this.database.db + .update(runOutboundMessages) + .set({ status: 'accepted', ack, acceptedAt: now, updatedAt: now }) + .where(eq(runOutboundMessages.messageId, messageId)); + } + + async markRejected(messageId: string, errorMessage: string, ack?: Record) { + const now = new Date().toISOString(); + await this.database.db + .update(runOutboundMessages) + .set({ status: 'rejected', ack, errorMessage, updatedAt: now }) + .where(eq(runOutboundMessages.messageId, messageId)); + } + + async findByMessageId(messageId: string) { + const rows = await this.database.db + .select() + .from(runOutboundMessages) + .where(eq(runOutboundMessages.messageId, messageId)) + .limit(1); + return rows[0] ?? null; + } + + async listByRunId(runId: string) { + return this.database.db + .select() + .from(runOutboundMessages) + .where(eq(runOutboundMessages.runId, runId)) + .orderBy(asc(runOutboundMessages.createdAt)); + } + + async listByRunIdAndStatus(runId: string, status: string) { + return this.database.db + .select() + .from(runOutboundMessages) + .where(and(eq(runOutboundMessages.runId, runId), eq(runOutboundMessages.status, status))) + .orderBy(asc(runOutboundMessages.createdAt)); + } +} diff --git a/src/storage/run.repository.spec.ts b/src/storage/run.repository.spec.ts index dc0fa8d..019def3 100644 --- a/src/storage/run.repository.spec.ts +++ b/src/storage/run.repository.spec.ts @@ -1,4 +1,4 @@ -import { ConflictException } from '@nestjs/common'; +import { ConflictException, NotFoundException } from '@nestjs/common'; import { RunRepository } from './run.repository'; import { DatabaseService } from '../db/database.service'; @@ -117,9 +117,10 @@ describe('RunRepository', () => { // ------ findByIdOrThrow ------ describe('findByIdOrThrow', () => { - it('throws when the run is not found', async () => { + it('throws NotFoundException when the run is not found', async () => { mockDb._select.limit.mockResolvedValue([]); + await expect(repo.findByIdOrThrow('missing-id')).rejects.toThrow(NotFoundException); await expect(repo.findByIdOrThrow('missing-id')).rejects.toThrow( 'run missing-id not found' ); diff --git a/src/storage/run.repository.ts b/src/storage/run.repository.ts index db274e2..5774a5c 100644 --- a/src/storage/run.repository.ts +++ b/src/storage/run.repository.ts @@ -1,4 +1,4 @@ -import { ConflictException, Injectable } from '@nestjs/common'; +import { ConflictException, Injectable, NotFoundException } from '@nestjs/common'; import { and, asc, desc, eq, gt, inArray, lt, sql, SQL } from 'drizzle-orm'; import { randomUUID } from 'node:crypto'; import { RunStatus } from '../contracts/control-plane'; @@ -61,7 +61,7 @@ export class RunRepository { async findByIdOrThrow(id: string) { const row = await this.findById(id); - if (!row) throw new Error(`run ${id} not found`); + if (!row) throw new NotFoundException(`run ${id} not found`); return row; } diff --git a/src/storage/runtime-session.repository.ts b/src/storage/runtime-session.repository.ts index 59b8439..8058d6d 100644 --- a/src/storage/runtime-session.repository.ts +++ b/src/storage/runtime-session.repository.ts @@ -47,4 +47,25 @@ export class RuntimeSessionRepository { .where(eq(runtimeSessions.runId, runId)); return this.findByRunId(runId); } + + async updateStreamCursor(runId: string, cursor: number) { + await this.database.db + .update(runtimeSessions) + .set({ lastStreamCursor: cursor, updatedAt: new Date().toISOString() }) + .where(eq(runtimeSessions.runId, runId)); + } + + async updateStreamConnected(runId: string) { + await this.database.db + .update(runtimeSessions) + .set({ streamConnectedAt: new Date().toISOString(), updatedAt: new Date().toISOString() }) + .where(eq(runtimeSessions.runId, runId)); + } + + async updateStreamDisconnected(runId: string) { + await this.database.db + .update(runtimeSessions) + .set({ streamDisconnectedAt: new Date().toISOString(), updatedAt: new Date().toISOString() }) + .where(eq(runtimeSessions.runId, runId)); + } } diff --git a/src/telemetry/instrumentation.service.ts b/src/telemetry/instrumentation.service.ts index 743e637..9b41c4c 100644 --- a/src/telemetry/instrumentation.service.ts +++ b/src/telemetry/instrumentation.service.ts @@ -54,6 +54,41 @@ export class InstrumentationService implements OnModuleInit { help: 'Total circuit breaker success count' }); + readonly outboundMessagesTotal = new client.Counter({ + name: 'macp_outbound_messages_total', + help: 'Total outbound messages by category and status', + labelNames: ['category', 'status'] as const + }); + + readonly inboundMessagesTotal = new client.Counter({ + name: 'macp_inbound_messages_total', + help: 'Total inbound messages by mode and message type', + labelNames: ['mode', 'message_type'] as const + }); + + readonly signalsTotal = new client.Counter({ + name: 'macp_signals_total', + help: 'Total signals by signal type', + labelNames: ['signal_type'] as const + }); + + readonly streamReconnectsTotal = new client.Counter({ + name: 'macp_stream_reconnects_total', + help: 'Total stream reconnection attempts' + }); + + readonly recoveryTotal = new client.Counter({ + name: 'macp_recovery_total', + help: 'Total recovery attempts by status', + labelNames: ['status'] as const + }); + + readonly webhookDeliveriesTotal = new client.Counter({ + name: 'macp_webhook_deliveries_total', + help: 'Total webhook deliveries by status', + labelNames: ['status'] as const + }); + onModuleInit(): void { client.collectDefaultMetrics(); } diff --git a/src/webhooks/webhook-delivery.repository.ts b/src/webhooks/webhook-delivery.repository.ts new file mode 100644 index 0000000..2fa75c3 --- /dev/null +++ b/src/webhooks/webhook-delivery.repository.ts @@ -0,0 +1,73 @@ +import { Injectable } from '@nestjs/common'; +import { eq } from 'drizzle-orm'; +import { randomUUID } from 'node:crypto'; +import { DatabaseService } from '../db/database.service'; +import { webhookDeliveries } from '../db/schema'; + +@Injectable() +export class WebhookDeliveryRepository { + constructor(private readonly database: DatabaseService) {} + + async create(input: { + webhookId: string; + event: string; + runId: string; + payload: Record; + }) { + const id = randomUUID(); + const now = new Date().toISOString(); + await this.database.db.insert(webhookDeliveries).values({ + id, + webhookId: input.webhookId, + event: input.event, + runId: input.runId, + payload: input.payload, + status: 'pending', + attempts: 0, + createdAt: now + }); + return { id, ...input, status: 'pending', attempts: 0, createdAt: now }; + } + + async markDelivered(id: string, responseStatus: number) { + const now = new Date().toISOString(); + await this.database.db + .update(webhookDeliveries) + .set({ + status: 'delivered', + responseStatus, + lastAttemptAt: now, + deliveredAt: now, + attempts: 1 + }) + .where(eq(webhookDeliveries.id, id)); + } + + async markFailed(id: string, attempt: number, errorMessage: string, responseStatus?: number) { + const now = new Date().toISOString(); + await this.database.db + .update(webhookDeliveries) + .set({ + status: attempt >= 3 ? 'failed' : 'pending', + attempts: attempt, + lastAttemptAt: now, + errorMessage, + responseStatus + }) + .where(eq(webhookDeliveries.id, id)); + } + + async listPending() { + return this.database.db + .select() + .from(webhookDeliveries) + .where(eq(webhookDeliveries.status, 'pending')); + } + + async listByWebhookId(webhookId: string) { + return this.database.db + .select() + .from(webhookDeliveries) + .where(eq(webhookDeliveries.webhookId, webhookId)); + } +} diff --git a/src/webhooks/webhook.repository.ts b/src/webhooks/webhook.repository.ts index 681977f..471aa16 100644 --- a/src/webhooks/webhook.repository.ts +++ b/src/webhooks/webhook.repository.ts @@ -15,7 +15,7 @@ export class WebhookRepository { url: input.url, events: input.events, secret: input.secret, - active: 1, + active: true, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString() }); @@ -28,7 +28,7 @@ export class WebhookRepository { } async listActive() { - return this.database.db.select().from(webhooks).where(eq(webhooks.active, 1)); + return this.database.db.select().from(webhooks).where(eq(webhooks.active, true)); } async list() { diff --git a/src/webhooks/webhook.service.ts b/src/webhooks/webhook.service.ts index 4e10908..3b75bb8 100644 --- a/src/webhooks/webhook.service.ts +++ b/src/webhooks/webhook.service.ts @@ -1,5 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { createHmac } from 'node:crypto'; +import { WebhookDeliveryRepository } from './webhook-delivery.repository'; import { WebhookRepository } from './webhook.repository'; export interface WebhookPayload { @@ -14,7 +15,10 @@ export interface WebhookPayload { export class WebhookService { private readonly logger = new Logger(WebhookService.name); - constructor(private readonly webhookRepository: WebhookRepository) {} + constructor( + private readonly webhookRepository: WebhookRepository, + private readonly deliveryRepository: WebhookDeliveryRepository + ) {} async register(input: { url: string; events: string[]; secret: string }) { return this.webhookRepository.create(input); @@ -35,38 +39,75 @@ export class WebhookService { ); for (const webhook of matching) { - void this.deliver(webhook.url, webhook.secret, payload); + // Outbox pattern: insert delivery record first, then attempt delivery + const delivery = await this.deliveryRepository.create({ + webhookId: webhook.id, + event: payload.event, + runId: payload.runId, + payload: payload as unknown as Record + }); + void this.deliverWithTracking(delivery.id, webhook.url, webhook.secret, payload); + } + } + + async retryPending(): Promise { + const pending = await this.deliveryRepository.listPending(); + let retried = 0; + for (const delivery of pending) { + const webhook = await this.webhookRepository.findById(delivery.webhookId); + if (!webhook) continue; + void this.deliverWithTracking( + delivery.id, + webhook.url, + webhook.secret, + delivery.payload as unknown as WebhookPayload, + delivery.attempts + ); + retried++; } + return retried; } - private async deliver(url: string, secret: string, payload: WebhookPayload, attempt = 1): Promise { + private async deliverWithTracking( + deliveryId: string, + url: string, + secret: string, + payload: WebhookPayload, + startAttempt = 0 + ): Promise { const maxAttempts = 3; const body = JSON.stringify(payload); const signature = createHmac('sha256', secret).update(body).digest('hex'); - try { - const response = await fetch(url, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'X-MACP-Signature': signature, - 'X-MACP-Event': payload.event - }, - body, - signal: AbortSignal.timeout(10_000) - }); + for (let attempt = startAttempt + 1; attempt <= maxAttempts; attempt++) { + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-MACP-Signature': signature, + 'X-MACP-Event': payload.event + }, + body, + signal: AbortSignal.timeout(10_000) + }); - if (!response.ok) { - throw new Error(`webhook returned ${response.status}`); - } - } catch (error) { - this.logger.warn( - `webhook delivery to ${url} failed (attempt ${attempt}/${maxAttempts}): ${error instanceof Error ? error.message : String(error)}` - ); - if (attempt < maxAttempts) { - const backoffMs = 1000 * 2 ** (attempt - 1); - await new Promise((resolve) => setTimeout(resolve, backoffMs)); - return this.deliver(url, secret, payload, attempt + 1); + if (!response.ok) { + throw new Error(`webhook returned ${response.status}`); + } + + await this.deliveryRepository.markDelivered(deliveryId, response.status); + return; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.warn( + `webhook delivery to ${url} failed (attempt ${attempt}/${maxAttempts}): ${errorMessage}` + ); + await this.deliveryRepository.markFailed(deliveryId, attempt, errorMessage); + if (attempt < maxAttempts) { + const backoffMs = 1000 * 2 ** (attempt - 1); + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + } } } }