diff --git a/.claude/event-processor.md b/.claude/event-processor.md new file mode 100644 index 000000000..5eadb2e91 --- /dev/null +++ b/.claude/event-processor.md @@ -0,0 +1,183 @@ +# Event Processor + +## Purpose + +The event processor is a queue-based system that transforms engine events into Lightning websocket events while guaranteeing sequential ordering and enabling batching. It bridges the multi-threaded Runtime Engine and the websocket channel to Lightning. + +## Core Problem + +The processor solves a fundamental mismatch: + +- **Runtime Engine**: Emits events asynchronously from worker threads, potentially in rapid bursts +- **Lightning websocket**: Requires sequential transmission with acknowledgment before sending the next message +- **Solution**: Queue all events and process one at a time, with optional batching for high-frequency events + +## Architecture + +### Event Flow + +1. **Registration**: Processor registers listeners for all engine events (workflow start/complete, job start/complete/error, logs) +2. **Enqueueing**: When engine emits event, processor immediately pushes `{name, event}` onto queue +3. **Processing**: Only first queue item processes at a time; completion triggers next item automatically +4. **Transformation**: Event handlers convert engine payloads to Lightning payloads +5. **Transmission**: `sendEvent` wraps Phoenix channel push with error handling and acknowledgment + +### Sequential Guarantee + +Events sent in exact order they were emitted: + +- Process function awaits send operation +- Send operation awaits websocket acknowledgment +- Only after acknowledgment does queue shift and process next event +- Exception: batched events sent as atomic unit at position of first event in batch + +## Batching System + +### Why Batching Exists + +Without batching, workflows producing hundreds of logs would create hundreds of websocket messages, each requiring round-trip acknowledgment. Severe performance penalty. + +We see in production servers that this can introduce latency on log messages + +### How It Works + +**Opening a Batch**: + +- When processing event with batching enabled, processor opens batch for that event type +- Event added to accumulating array instead of sending immediately +- **Peek-ahead optimization**: Processor scans forward in queue, pulling all subsequent matching events into batch until limit reached or queue exhausted + +**Closing a Batch**: +Three conditions trigger send: + +1. Batch reaches size limit (default: 10 events) +2. Different event type arrives (batch sent before processing new event) +3. Timeout expires (default: 10ms) + +**Ordering with Batches**: + +- If non-batchable event arrives while batch open, batch immediately closes and sends +- Maintains strict ordering: events arriving after batch must be sent after batch + +### Configuration + +Currently only `WORKFLOW_LOG` events batch, and only when `batchLogs` option enabled. Tunable parameters: + +- `batchLimit`: Maximum events per batch (default: 10) +- `batchInterval`: Maximum time batch stays open (default: 10ms) + +## Event Handler Contract + +**Inputs**: + +- Full context object (run ID, channel, state, logger, engine) +- Raw engine event payload + +**Responsibilities**: + +- Transform engine payload to Lightning payload format +- Call `sendEvent` to transmit via websocket +- Can be sync or async (processor awaits either way) + +**Special Cases**: + +- `run-start`: Loads version info, sends additional log events synchronously +- `run-log`: Dual-mode behavior—batches logs into array or sends individually based on config + +## Error Handling + +### Handler Errors + +- All handler execution wrapped in try-catch +- If handler throws: catch, check if already reported to Sentry, report if not, log, continue to next event +- Failed event removed from queue, processing continues +- No retry by processor (sendEvent has own retry logic for timeouts, currently disabled) + +### Websocket Errors + +- `sendEvent` wraps Phoenix channel with promise +- Resolves on `ok`, rejects on `error` or `timeout` +- Errors reported to Sentry with run context +- No circuit breaker: if channel fails, each subsequent event fails individually + +### Isolation + +Bugs in one handler don't cascade to other events, but partial state may be visible in Lightning if critical events fail (e.g., step-start succeeds but step-complete fails). + +## Integration Points + +### Initialization (execute.ts) + +``` +eventProcessor(engine, context, { + [WORKFLOW_START]: handleRunStart, + [JOB_START]: handleStepStart, + [JOB_COMPLETE]: handleStepComplete, + [JOB_ERROR]: onJobError, + [WORKFLOW_LOG]: handleRunLog, + [WORKFLOW_COMPLETE]: handleRunComplete, + [WORKFLOW_ERROR]: handleRunError, +}, { + batch: options.batchLogs ? { [WORKFLOW_LOG]: true } : {}, + batchInterval: options.batchInterval, + batchLimit: options.batchLimit, +}) +``` + +Processor set up before workflow starts, ensuring no events missed. + +### Websocket Layer (send-event.ts) + +- Wraps Phoenix channel push with promise interface +- Handles `ok`, `error`, `timeout` responses +- Retry-on-timeout infrastructure exists but disabled (duplication concerns on Lightning side) +- Reports errors to Sentry with run context and rejection reasons + +### Sentry Integration + +- Adds breadcrumb for each processed event (except workflow-log to reduce noise) +- Breadcrumbs use engine event names (documenting what processor received) +- Creates trail showing event sequence leading up to errors +- Handler errors reported with run context and event name + +## Performance Characteristics + +**Without Batching**: + +- Every event incurs full websocket round-trip cost +- High-frequency logs create bottleneck +- Queue grows during waiting periods + +**With Batching**: + +- Hundreds of logs/second → tens of messages +- Order of magnitude reduction in network overhead +- Peek-ahead optimization maximizes batch size even for slightly staggered events + +**Bottlenecks**: + +- Synchronous processing means slow handlers (e.g., loading versions) block queue +- In practice, websocket latency dominates +- No parallelization by design (ordering guarantee more important than throughput) + +## Key Design Decisions + +1. **Single-threaded sequential processing**: Sacrifices parallel throughput for ordering guarantees (Lightning cannot reconstruct order from timestamps due to clock skew) + +2. **Peek-ahead optimization**: Aggressively accumulates queued events into batch, making batch size predictable and reducing latency + +3. **Continue on error**: Single failed event doesn't halt subsequent events (robustness over consistency) + +4. **Callback-based architecture**: Processor generic and testable, knows nothing about Lightning protocol, just queues/batches/invokes callbacks + +5. **No explicit teardown**: Relies on engine event emitter lifecycle; queue drains naturally when workflow completes + +## Critical Implementation Details + +- Active batch tracked with `activeBatch` variable (event name or null) +- Batch events stored in `batch` array, cleared after send +- Batch timeout stored in `batchTimeout`, cleared when batch sends +- Queue implemented as array, items are `{name, event}` objects +- Processing triggered by `enqueue` when queue length becomes 1 +- Recursive `next()` call after queue shift creates continuous flow +- Event handlers imported from events directory, mapped explicitly in execute function diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 9cf2511b3..11bb3186d 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,12 @@ # ws-worker +## 1.20.2 + +### Patch Changes + +- Add internal fallback timeout to events. Also adjust error handling to try and catch an uncaught error +- q + ## 1.20.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index cd675039f..2b71b2908 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.20.1", + "version": "1.20.2", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 2df77ed0c..422c68f1f 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -83,6 +83,11 @@ export function execute( }, }); + const batch: any = {}; + if (options.batchLogs) { + batch[WORKFLOW_LOG] = true; + } + eventProcessor( engine, context, @@ -96,13 +101,11 @@ export function execute( [WORKFLOW_ERROR]: handleRunError, }, { - batch: options.batchLogs - ? { - [WORKFLOW_LOG]: true, - } - : {}, + batch, batchInterval: options.batchInterval, batchLimit: options.batchLimit, + timeout_ms: + (options.eventTimeoutSeconds ?? 0) * 1000 * 1.1 /* grace period */, } ); diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index e7120855d..bf5aceca6 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -25,6 +25,7 @@ export type EventProcessorOptions = { batch?: Record; batchInterval?: number; batchLimit?: number; + timeout_ms?: number; }; const DEFAULT_BATCH_LIMIT = 10; @@ -65,25 +66,43 @@ export function eventProcessor( const { batchLimit: limit = DEFAULT_BATCH_LIMIT, batchInterval: interval = DEFAULT_BATCH_INTERVAL, + timeout_ms, } = options; const queue: any = []; + let activeBatch: string | null = null; + let batch: any = []; + let batchTimeout: NodeJS.Timeout; + let didFinish = false; + let timeoutHandle: NodeJS.Timeout; + const next = async () => { const evt = queue[0]; if (evt) { + didFinish = false; + + const finish = () => { + clearTimeout(timeoutHandle); + if (!didFinish) { + didFinish = true; + queue.shift(); + setImmediate(next); + } + }; + + if (timeout_ms) { + timeoutHandle = setTimeout(() => { + logger.error(`${planId} :: ${evt.name} :: timeout (fallback)`); + finish(); + }, timeout_ms); + } + await process(evt.name, evt.event); - queue.shift(); // keep the event on the queue until processing is finished - // setImmediate(next); - next(); + finish(); } }; - let activeBatch: string | null = null; - let batch: any = []; - let start: number = -1; - let batchTimeout: NodeJS.Timeout; - const sendBatch = async (name: string) => { clearTimeout(batchTimeout); // first clear the batch @@ -93,19 +112,29 @@ export function eventProcessor( }; const send = async (name: string, payload: any, batchSize?: number) => { - // @ts-ignore - const lightningEvent = eventMap[name] ?? name; - await callbacks[name](context, payload); - if (batchSize) { - logger.info( - `${planId} :: sent ${lightningEvent} (${batchSize}):: OK :: ${ - Date.now() - start - }ms` - ); - } else { - logger.info( - `${planId} :: sent ${lightningEvent} :: OK :: ${Date.now() - start}ms` - ); + try { + const start = Date.now(); + // @ts-ignore + const lightningEvent = eventMap[name] ?? name; + await callbacks[name](context, payload); + if (batchSize) { + logger.info( + `${planId} :: sent ${lightningEvent} (${batchSize}):: OK :: ${ + Date.now() - start + }ms` + ); + } else { + logger.info( + `${planId} :: sent ${lightningEvent} :: OK :: ${Date.now() - start}ms` + ); + } + } catch (e: any) { + if (!e.reportedToSentry) { + Sentry.captureException(e); + logger.error(e); + } + // Do nothing else here: the error should have been handled + // and life will go on } }; @@ -133,45 +162,33 @@ export function eventProcessor( } if (name in callbacks) { - try { - start = Date.now(); + if (options?.batch?.[name]) { + // batch mode is enabled! + activeBatch = name; - if (options?.batch?.[name]) { - // batch mode is enabled! - activeBatch = name; + // First, push this event to the batch + batch.push(event); - // First, push this event to the batch - batch.push(event); + // Next, peek ahead in the queue for more pending events + while (queue.length > 1 && queue[1].name === name) { + const [nextBatchItem] = queue.splice(1, 1); + batch.push(nextBatchItem.event); - // Next, peek ahead in the queue for more pending events - while (queue.length > 1 && queue[1].name === name) { - const [nextBatchItem] = queue.splice(1, 1); - batch.push(nextBatchItem.event); - - if (batch.length >= limit) { - // If we're at the batch limit, return right away - return sendBatch(name); - } - } - - // finally wait for a time before sending the batch - if (!batchTimeout) { - const batchName = activeBatch!; - batchTimeout = setTimeout(async () => { - sendBatch(batchName); - }, interval); + if (batch.length >= limit) { + // If we're at the batch limit, return right away + return sendBatch(name); } - return; } - await send(name, event); - } catch (e: any) { - if (!e.reportedToSentry) { - Sentry.captureException(e); - logger.error(e); + // finally wait for a time before sending the batch + if (!batchTimeout) { + const batchName = activeBatch!; + batchTimeout = setTimeout(async () => { + sendBatch(batchName); + }, interval); } - // Do nothing else here: the error should have been handled - // and life will go on + } else { + await send(name, event); } } else { logger.warn('no event bound for', name); diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index bfdc74980..6e8cd7c39 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -315,6 +315,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { options.timeoutRetryCount = app.options.timeoutRetryCount; options.timeoutRetryDelay = app.options.timeoutRetryDelayMs ?? app.options.socketTimeoutSeconds; + options.eventTimeoutSeconds = app.options.messageTimeoutSeconds; options.batchLogs = app.options.batchLogs; options.batchInterval = app.options.batchInterval; options.batchLimit = app.options.batchLimit; diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index bc51ae4de..e284c6cad 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -54,6 +54,7 @@ export type WorkerRunOptions = ExecuteOptions & { batchLogs?: boolean; batchInterval?: number; batchLimit?: number; + eventTimeoutSeconds?: number; }; type ConversionOptions = { diff --git a/packages/ws-worker/src/util/send-event.ts b/packages/ws-worker/src/util/send-event.ts index 8c3800706..f4cad446f 100644 --- a/packages/ws-worker/src/util/send-event.ts +++ b/packages/ws-worker/src/util/send-event.ts @@ -55,6 +55,11 @@ export const sendEvent = ( if (!allowRetryOntimeout || thisAttempt >= timeoutRetryCount) { report(new LightningTimeoutError(event)); } else { + // TODO at the moment, this retry logic all shares the same timeout, + // where the timeout is controlled by the event processor + // When we want to restore retries, we need to retry in the event + // processor - not here + // This actually feels cleaner and easier to test anyway logger.warn( `${runId} event ${event} timed out, will retry in ${timeoutRetryDelay}ms (attempt ${ thisAttempt + 1 diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 5c7dec997..6687ab53f 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -35,7 +35,7 @@ const createPlan = (...expressions: string[]) => adaptors: [], next: expressions[idx + 1] ? { [`${idx + 1}`]: true } : {}, })) - : [{ expression: ['fn(s => s)'] }], + : [{ expression: 'fn(s => s)' }], }, options: {}, } as ExecutionPlan); @@ -640,6 +640,7 @@ test('queue events behind a slow event', async (t) => { await engine.execute(plan, {}); await waitForAsync(100); + // Should only be one event triggered t.is(events.length, 1); @@ -649,7 +650,7 @@ test('queue events behind a slow event', async (t) => { // This isn't the most watertight test - but I've debugged it closely and it seems // to do the right thing -test.only('queue events behind a slow event II', async (t) => { +test('queue events behind a slow event II', async (t) => { const engine = await createMockEngine(); const plan = createPlan( ` @@ -708,3 +709,61 @@ test.only('queue events behind a slow event II', async (t) => { t.is(events[0], 10); t.is(events[1], 10); }); + +test('should timeout and continue processing when event handler hangs', async (t) => { + const engine = await createMockEngine(); + const plan = createPlan(); + + const context = { + id: 'a', + plan, + options: {}, + logger, + }; + + const processedEvents: string[] = []; + + const callbacks = { + [WORKFLOW_START]: () => { + // Hang forever - don't resolve + return new Promise(() => {}); + }, + [JOB_START]: () => { + processedEvents.push('job-start'); + }, + [JOB_COMPLETE]: () => { + processedEvents.push('job-complete'); + }, + [WORKFLOW_COMPLETE]: () => { + processedEvents.push('workflow-complete'); + }, + }; + + const options = { + // If we disable the timeout, this test fails to process any event + timeout_ms: 50, + }; + + eventProcessor(engine, context as any, callbacks, options); + + await engine.execute(plan, {}); + + // Wait for timeout to fire and subsequent events to process + await waitForAsync(100); + + // Check that the timeout error was logged + const timeoutLog = logger._find('error', /timeout \(fallback\)/); + t.truthy(timeoutLog); + if (timeoutLog) { + t.true((timeoutLog.message as string).includes('workflow-start')); + } + + // Check that subsequent events were processed despite the timeout + t.true( + processedEvents.length > 0, + `Expected events to be processed, got: ${processedEvents.join(', ')}` + ); + t.true(processedEvents.includes('job-start')); + t.true(processedEvents.includes('job-complete')); + t.true(processedEvents.includes('workflow-complete')); +});