From fac5ae5e09564e4a2c2d9fcac5819f27329cba1f Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Mon, 11 May 2026 09:56:03 +0300 Subject: [PATCH 01/17] feat: forward state.webhookResponse on step:complete Job code sets state.webhookResponse = { status, body }; the worker forwards it on the step:complete payload as webhook_response. The key is stripped from the dataclip (worker) and from the next step's input state (runtime) so it cannot leak downstream. --- .changeset/webhook-response-step-complete.md | 15 ++ packages/lexicon/lightning.d.ts | 6 + packages/runtime/src/execute/step.ts | 11 + packages/runtime/test/execute/step.test.ts | 48 +++++ .../ws-worker/src/events/step-complete.ts | 26 ++- .../test/events/step-complete.test.ts | 201 ++++++++++++++++++ 6 files changed, 306 insertions(+), 1 deletion(-) create mode 100644 .changeset/webhook-response-step-complete.md diff --git a/.changeset/webhook-response-step-complete.md b/.changeset/webhook-response-step-complete.md new file mode 100644 index 000000000..a6ae173ba --- /dev/null +++ b/.changeset/webhook-response-step-complete.md @@ -0,0 +1,15 @@ +--- +'@openfn/ws-worker': minor +'@openfn/runtime': minor +'@openfn/lexicon': minor +--- + +Forward `state.webhookResponse` to Lightning on `step:complete`. + +If a job sets `state.webhookResponse = { status, body }` (camelCase in job +code), the worker extracts it from user state and attaches it to the +`step:complete` payload as `webhook_response` (snake_case, matching the rest +of the wire payload). The key is stripped from the persisted dataclip +(worker) and from the next step's input state (runtime), so it cannot leak. +Lightning decides when to flush the HTTP response based on the trigger +configuration. diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index 81d9ac5d3..bab1bdc9a 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -209,6 +209,11 @@ export type StepStartPayload = { }; export type StepStartReply = void; +export type WebhookResponse = { + status?: number; + body?: Record | any[]; +}; + export type StepCompletePayload = ExitReason & { run_id?: string; job_id: string; @@ -223,6 +228,7 @@ export type StepCompletePayload = ExitReason & { }; duration: number; timestamp: TimeInMicroSeconds; + webhook_response?: WebhookResponse; }; export type StepCompleteReply = void; diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index e5ef79a52..0e04c0bce 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -276,6 +276,17 @@ const executeStep = async ( next, mem, }); + + // webhookResponse is a platform contract between jobs and the worker. + // It's emitted on step:complete but must not propagate into the next + // step's input state. + if ( + result && + typeof result === 'object' && + 'webhookResponse' in result + ) { + delete result.webhookResponse; + } } } else { // calculate next for trigger nodes diff --git a/packages/runtime/test/execute/step.test.ts b/packages/runtime/test/execute/step.test.ts index b8517fb0e..e03bc59a7 100644 --- a/packages/runtime/test/execute/step.test.ts +++ b/packages/runtime/test/execute/step.test.ts @@ -440,6 +440,54 @@ test.serial( } ); +test.serial( + 'webhookResponse is emitted on NOTIFY_JOB_COMPLETE', + async (t) => { + const job = [ + async (s: State) => ({ + ...s, + webhookResponse: { status: 201, body: { ok: true } }, + }), + ]; + const step = { id: 'k', expression: job }; + + // Snapshot the state inside the notify callback because in real use + // notify crosses the worker thread boundary (postMessage structured-clones + // the payload), so consumers receive a copy. Here it's a shared reference, + // so we must capture before the runtime strips webhookResponse. + let snapshot: any; + const notify = (event: string, payload: any) => { + if (event === NOTIFY_JOB_COMPLETE) { + snapshot = JSON.parse(JSON.stringify(payload.state)); + } + }; + const context = createContext({ notify }); + + await execute(context, step, createState()); + t.deepEqual(snapshot.webhookResponse, { + status: 201, + body: { ok: true }, + }); + } +); + +test.serial( + 'webhookResponse is removed from result state so it does not propagate to next step', + async (t) => { + const job = [ + async (s: State) => ({ + ...s, + webhookResponse: { status: 201, body: { ok: true } }, + }), + ]; + const step = { id: 'k', expression: job }; + const context = createContext(); + + const result = await execute(context, step, createState()); + t.false('webhookResponse' in (result.state as any)); + } +); + test.serial( 'no props are removed from state if an empty array is passed to statePropsToRemove', async (t) => { diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 4c0640f87..d3f98e034 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -1,5 +1,8 @@ import crypto from 'node:crypto'; -import type { StepCompletePayload } from '@openfn/lexicon/lightning'; +import type { + StepCompletePayload, + WebhookResponse, +} from '@openfn/lexicon/lightning'; import type { JobCompletePayload } from '@openfn/engine-multi'; import { timestamp } from '@openfn/logger'; @@ -25,6 +28,23 @@ export default async function onStepComplete( if (!state.dataclips) { state.dataclips = {}; } + + // Extract webhookResponse off user state BEFORE the state is captured as a + // dataclip, so it doesn't persist in the dataclip and doesn't flow into + // downstream steps. This is the worker↔Lightning contract: user sets + // state.webhookResponse in job code (camelCase, JS-style), the worker + // forwards it on step:complete as `webhook_response` (snake_case, matching + // the rest of the payload), and Lightning decides when to flush the response. + let webhookResponse: WebhookResponse | undefined; + if ( + event.state && + typeof event.state === 'object' && + 'webhookResponse' in event.state + ) { + webhookResponse = event.state.webhookResponse; + delete event.state.webhookResponse; + } + const outputState = event.state || {}; state.dataclips[dataclipId] = event.state; @@ -77,6 +97,10 @@ export default async function onStepComplete( } } + if (webhookResponse !== undefined) { + evt.webhook_response = webhookResponse; + } + const reason = calculateJobExitReason(job_id, event.state, error); state.reasons[job_id] = reason; diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 7a74f872d..8b2b551b0 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -365,6 +365,207 @@ test('accumulate multiple leaf dataclips for branching workflow', async (t) => { t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]); }); +test('includes webhook_response in step:complete payload when webhookResponse is set on state', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: { ok: true } } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + status: 201, + body: { ok: true }, + }); +}); + +test('omits webhook_response from payload when webhookResponse is not set on state', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { state: { x: 10 } } as any; + await handleStepComplete({ channel, state } as any, event); + + t.is(lightningEvent.webhook_response, undefined); +}); + +test('strips webhookResponse from the stored dataclip', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: {} } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + const [dataclip] = Object.values(state.dataclips); + t.deepEqual(dataclip, { x: 10 }); +}); + +test('strips webhookResponse from the serialized output_dataclip sent to Lightning', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: {} } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(JSON.parse(lightningEvent.output_dataclip), { x: 10 }); +}); + +test('strips webhookResponse from event.state so it does not flow downstream', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: {} } }, + next: ['job-2'], + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.false('webhookResponse' in event.state); +}); + +test('handles webhookResponse with only a body', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { webhookResponse: { body: { message: 'hello' } } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { body: { message: 'hello' } }); +}); + +test('handles webhookResponse body as a JSON array', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { webhookResponse: { status: 200, body: [{ id: 1 }, { id: 2 }] } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + status: 200, + body: [{ id: 1 }, { id: 2 }], + }); +}); + +test('does nothing with webhookResponse when event.state is empty', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + await handleStepComplete({ channel, state } as any, { state: {} } as any); + t.is(lightningEvent.webhook_response, undefined); +}); + +test('does nothing with webhookResponse when event.state is undefined', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + await t.notThrowsAsync(() => + handleStepComplete({ channel, state } as any, {} as any) + ); + t.is(lightningEvent.webhook_response, undefined); +}); + // Single leaf reached by two paths: start → a → x, start → b → x // x executes twice, both times with no downstream test('accumulate two leaf dataclips when same node reached by two paths', async (t) => { From fbbb75df51848b85e3d187deb5efe0b9a3dffa0a Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Mon, 11 May 2026 10:14:57 +0300 Subject: [PATCH 02/17] format files --- packages/runtime/src/execute/step.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 0e04c0bce..6784dd974 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -280,11 +280,7 @@ const executeStep = async ( // webhookResponse is a platform contract between jobs and the worker. // It's emitted on step:complete but must not propagate into the next // step's input state. - if ( - result && - typeof result === 'object' && - 'webhookResponse' in result - ) { + if (result && typeof result === 'object' && 'webhookResponse' in result) { delete result.webhookResponse; } } From d80b8bb29f94bad3b67759fc8317aac138ac589e Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 12 May 2026 15:44:25 +0000 Subject: [PATCH 03/17] feat: passing of webhook_response with cleanup --- packages/engine-multi/src/events.ts | 1 + packages/runtime/src/execute/step.ts | 19 +++++--------- packages/runtime/src/types.ts | 1 + .../ws-worker/src/events/step-complete.ts | 26 ++----------------- 4 files changed, 11 insertions(+), 36 deletions(-) diff --git a/packages/engine-multi/src/events.ts b/packages/engine-multi/src/events.ts index f9a81c32d..d2b6a8650 100644 --- a/packages/engine-multi/src/events.ts +++ b/packages/engine-multi/src/events.ts @@ -89,6 +89,7 @@ export interface JobCompletePayload extends ExternalEvent { job: number; system: number; }; + webhook_response?: any; } export interface JobErrorPayload extends ExternalEvent { diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 6784dd974..f73b2f398 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -97,11 +97,11 @@ const prepareFinalState = async ( throw e; } - if (!statePropsToRemove) { - // As a strict default, remove the configuration key - // tbh this should happen higher up in the stack but it causes havoc in unit testing - statePropsToRemove = ['configuration']; - } + // configuration & webhookResponse will by default always be removed + const defaultRemoveFields = ['configuration', 'webhookResponse']; + statePropsToRemove = [ + ...new Set((statePropsToRemove || []).concat(defaultRemoveFields)), + ]; const removedProps: string[] = []; statePropsToRemove.forEach((prop) => { @@ -235,6 +235,7 @@ const executeStep = async ( if (!didError) { const humanDuration = logger.timer(timerId); logger.success(`${jobName} completed in ${humanDuration}`); + const webhook_response = result?.webhookResponse || undefined; result = await prepareFinalState( result, logger, @@ -275,14 +276,8 @@ const executeStep = async ( jobId, next, mem, + webhook_response, }); - - // webhookResponse is a platform contract between jobs and the worker. - // It's emitted on step:complete but must not propagate into the next - // step's input state. - if (result && typeof result === 'object' && 'webhookResponse' in result) { - delete result.webhookResponse; - } } } else { // calculate next for trigger nodes diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index de67ce942..f453f01a0 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -173,6 +173,7 @@ export type NotifyJobCompletePayload = { system: number; peak?: number; }; + webhook_response?: any; }; export type NotifyJobErrorPayload = { diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index d3f98e034..f6dae0e2b 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -1,8 +1,5 @@ import crypto from 'node:crypto'; -import type { - StepCompletePayload, - WebhookResponse, -} from '@openfn/lexicon/lightning'; +import type { StepCompletePayload } from '@openfn/lexicon/lightning'; import type { JobCompletePayload } from '@openfn/engine-multi'; import { timestamp } from '@openfn/logger'; @@ -29,22 +26,6 @@ export default async function onStepComplete( state.dataclips = {}; } - // Extract webhookResponse off user state BEFORE the state is captured as a - // dataclip, so it doesn't persist in the dataclip and doesn't flow into - // downstream steps. This is the worker↔Lightning contract: user sets - // state.webhookResponse in job code (camelCase, JS-style), the worker - // forwards it on step:complete as `webhook_response` (snake_case, matching - // the rest of the payload), and Lightning decides when to flush the response. - let webhookResponse: WebhookResponse | undefined; - if ( - event.state && - typeof event.state === 'object' && - 'webhookResponse' in event.state - ) { - webhookResponse = event.state.webhookResponse; - delete event.state.webhookResponse; - } - const outputState = event.state || {}; state.dataclips[dataclipId] = event.state; @@ -70,6 +51,7 @@ export default async function onStepComplete( duration: event.duration, thread_id: event.threadId, timestamp: timeInMicroseconds(event.time), + webhook_response: event.webhook_response, } as StepCompletePayload; if (event.redacted) { @@ -97,10 +79,6 @@ export default async function onStepComplete( } } - if (webhookResponse !== undefined) { - evt.webhook_response = webhookResponse; - } - const reason = calculateJobExitReason(job_id, event.state, error); state.reasons[job_id] = reason; From a6c85baf8cd88c22114aa5237f26041fb07887bb Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 12 May 2026 16:00:05 +0000 Subject: [PATCH 04/17] feat: set iteration --- packages/runtime/src/execute/step.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index f73b2f398..0e5e4976a 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -99,9 +99,9 @@ const prepareFinalState = async ( // configuration & webhookResponse will by default always be removed const defaultRemoveFields = ['configuration', 'webhookResponse']; - statePropsToRemove = [ - ...new Set((statePropsToRemove || []).concat(defaultRemoveFields)), - ]; + statePropsToRemove = Array.from( + new Set((statePropsToRemove || []).concat(defaultRemoveFields)) + ); const removedProps: string[] = []; statePropsToRemove.forEach((prop) => { From 701698a0544dc05a01ff34eb0fbb7f211d72a87e Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 12 May 2026 16:33:03 +0000 Subject: [PATCH 05/17] feat: fallback to default list style --- packages/runtime/src/execute/step.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 0e5e4976a..5e02fea39 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -97,11 +97,11 @@ const prepareFinalState = async ( throw e; } - // configuration & webhookResponse will by default always be removed - const defaultRemoveFields = ['configuration', 'webhookResponse']; - statePropsToRemove = Array.from( - new Set((statePropsToRemove || []).concat(defaultRemoveFields)) - ); + if (!statePropsToRemove) { + // As a strict default, remove the configuration key + // tbh this should happen higher up in the stack but it causes havoc in unit testing + statePropsToRemove = ['configuration', 'webhookResponse']; + } const removedProps: string[] = []; statePropsToRemove.forEach((prop) => { From b21e06efc5fdd0cd839a108f8dcfaeec3e4c3bfa Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 12 May 2026 16:36:11 +0000 Subject: [PATCH 06/17] feat: add 'webhookResponse' to defaults --- packages/engine-multi/src/api.ts | 1 + packages/engine-multi/test/api.test.ts | 6 +++++- packages/ws-worker/src/util/cli.ts | 2 +- packages/ws-worker/test/util/cli.test.ts | 6 +++++- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/engine-multi/src/api.ts b/packages/engine-multi/src/api.ts index 29450b16a..dab1cc5f7 100644 --- a/packages/engine-multi/src/api.ts +++ b/packages/engine-multi/src/api.ts @@ -62,6 +62,7 @@ const createAPI = async function ( statePropsToRemove: options.statePropsToRemove ?? [ 'configuration', + 'webhookResponse', 'response', ], profile: options.profile, diff --git a/packages/engine-multi/test/api.test.ts b/packages/engine-multi/test/api.test.ts index d83fd6e6f..4b0d3237d 100644 --- a/packages/engine-multi/test/api.test.ts +++ b/packages/engine-multi/test/api.test.ts @@ -49,7 +49,11 @@ test.serial('engine api uses default options', async (t) => { t.truthy(api.options); - t.deepEqual(api.options.statePropsToRemove, ['configuration', 'response']); + t.deepEqual(api.options.statePropsToRemove, [ + 'configuration', + 'webhookResponse', + 'response', + ]); t.truthy(api.options.whitelist); }); diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index f8d27a126..84e288c33 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -323,7 +323,7 @@ export default function parseArgs(argv: string[]): Args { statePropsToRemove: setArg( args.statePropsToRemove, WORKER_STATE_PROPS_TO_REMOVE, - ['configuration', 'response'] + ['configuration', 'webhookResponse', 'response'] ), runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500), payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10), diff --git a/packages/ws-worker/test/util/cli.test.ts b/packages/ws-worker/test/util/cli.test.ts index 698008dc8..4514babdf 100644 --- a/packages/ws-worker/test/util/cli.test.ts +++ b/packages/ws-worker/test/util/cli.test.ts @@ -60,7 +60,11 @@ test('cli should set default values for unspecified options', (t) => { t.is(args.capacity, 5); t.is(args.sentryEnv, 'dev'); t.falsy(args.sentryDsn); - t.deepEqual(args.statePropsToRemove, ['configuration', 'response']); + t.deepEqual(args.statePropsToRemove, [ + 'configuration', + 'webhookResponse', + 'response', + ]); t.is(args.runMemory, 500); t.is(args.maxRunDurationSeconds, 300); t.is(args.engineValidationRetries, 3); From 4f3d5143ecf2365b6f1f1d85d07ea16ead4509d0 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 12 May 2026 16:52:12 +0000 Subject: [PATCH 07/17] test: update test --- packages/runtime/test/execute/step.test.ts | 51 ++++++++++------------ 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/packages/runtime/test/execute/step.test.ts b/packages/runtime/test/execute/step.test.ts index e03bc59a7..94c509cf7 100644 --- a/packages/runtime/test/execute/step.test.ts +++ b/packages/runtime/test/execute/step.test.ts @@ -440,36 +440,29 @@ test.serial( } ); -test.serial( - 'webhookResponse is emitted on NOTIFY_JOB_COMPLETE', - async (t) => { - const job = [ - async (s: State) => ({ - ...s, - webhookResponse: { status: 201, body: { ok: true } }, - }), - ]; - const step = { id: 'k', expression: job }; - - // Snapshot the state inside the notify callback because in real use - // notify crosses the worker thread boundary (postMessage structured-clones - // the payload), so consumers receive a copy. Here it's a shared reference, - // so we must capture before the runtime strips webhookResponse. - let snapshot: any; - const notify = (event: string, payload: any) => { - if (event === NOTIFY_JOB_COMPLETE) { - snapshot = JSON.parse(JSON.stringify(payload.state)); - } - }; - const context = createContext({ notify }); +test.serial('webhookResponse is emitted on NOTIFY_JOB_COMPLETE', async (t) => { + const job = [ + async (s: State) => ({ + ...s, + webhookResponse: { status: 201, body: { ok: true } }, + }), + ]; + const step = { id: 'k', expression: job }; + + let webhook_response: any; + const notify = (event: string, payload: any) => { + if (event === NOTIFY_JOB_COMPLETE) { + webhook_response = JSON.parse(JSON.stringify(payload.webhook_response)); + } + }; + const context = createContext({ notify }); - await execute(context, step, createState()); - t.deepEqual(snapshot.webhookResponse, { - status: 201, - body: { ok: true }, - }); - } -); + await execute(context, step, createState()); + t.deepEqual(webhook_response, { + status: 201, + body: { ok: true }, + }); +}); test.serial( 'webhookResponse is removed from result state so it does not propagate to next step', From 451aeaec9c8be2ce0aa4521629d3579089833b2c Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 07:43:25 +0000 Subject: [PATCH 08/17] revert: step.ts --- packages/runtime/src/execute/step.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 5e02fea39..e5ef79a52 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -100,7 +100,7 @@ const prepareFinalState = async ( if (!statePropsToRemove) { // As a strict default, remove the configuration key // tbh this should happen higher up in the stack but it causes havoc in unit testing - statePropsToRemove = ['configuration', 'webhookResponse']; + statePropsToRemove = ['configuration']; } const removedProps: string[] = []; @@ -235,7 +235,6 @@ const executeStep = async ( if (!didError) { const humanDuration = logger.timer(timerId); logger.success(`${jobName} completed in ${humanDuration}`); - const webhook_response = result?.webhookResponse || undefined; result = await prepareFinalState( result, logger, @@ -276,7 +275,6 @@ const executeStep = async ( jobId, next, mem, - webhook_response, }); } } else { From 5da28d9041831f71771857a32191510ee71d7d47 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 07:52:57 +0000 Subject: [PATCH 09/17] feat: pick webhookResponse not in runtime --- packages/engine-multi/src/events.ts | 1 - packages/runtime/src/types.ts | 1 - packages/ws-worker/src/events/step-complete.ts | 3 +-- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/engine-multi/src/events.ts b/packages/engine-multi/src/events.ts index d2b6a8650..f9a81c32d 100644 --- a/packages/engine-multi/src/events.ts +++ b/packages/engine-multi/src/events.ts @@ -89,7 +89,6 @@ export interface JobCompletePayload extends ExternalEvent { job: number; system: number; }; - webhook_response?: any; } export interface JobErrorPayload extends ExternalEvent { diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index f453f01a0..de67ce942 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -173,7 +173,6 @@ export type NotifyJobCompletePayload = { system: number; peak?: number; }; - webhook_response?: any; }; export type NotifyJobErrorPayload = { diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index f6dae0e2b..8dbc42796 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -25,7 +25,6 @@ export default async function onStepComplete( if (!state.dataclips) { state.dataclips = {}; } - const outputState = event.state || {}; state.dataclips[dataclipId] = event.state; @@ -51,7 +50,7 @@ export default async function onStepComplete( duration: event.duration, thread_id: event.threadId, timestamp: timeInMicroseconds(event.time), - webhook_response: event.webhook_response, + webhook_response: event.state?.webhookResponse || undefined, } as StepCompletePayload; if (event.redacted) { From 9a17740c8b0cffe4064130f8d6192558ea10ab1a Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 07:55:17 +0000 Subject: [PATCH 10/17] revert: removal of webhookResponse key --- packages/engine-multi/src/api.ts | 1 - packages/engine-multi/test/api.test.ts | 6 +----- packages/ws-worker/src/util/cli.ts | 2 +- packages/ws-worker/test/util/cli.test.ts | 6 +----- 4 files changed, 3 insertions(+), 12 deletions(-) diff --git a/packages/engine-multi/src/api.ts b/packages/engine-multi/src/api.ts index dab1cc5f7..29450b16a 100644 --- a/packages/engine-multi/src/api.ts +++ b/packages/engine-multi/src/api.ts @@ -62,7 +62,6 @@ const createAPI = async function ( statePropsToRemove: options.statePropsToRemove ?? [ 'configuration', - 'webhookResponse', 'response', ], profile: options.profile, diff --git a/packages/engine-multi/test/api.test.ts b/packages/engine-multi/test/api.test.ts index 4b0d3237d..d83fd6e6f 100644 --- a/packages/engine-multi/test/api.test.ts +++ b/packages/engine-multi/test/api.test.ts @@ -49,11 +49,7 @@ test.serial('engine api uses default options', async (t) => { t.truthy(api.options); - t.deepEqual(api.options.statePropsToRemove, [ - 'configuration', - 'webhookResponse', - 'response', - ]); + t.deepEqual(api.options.statePropsToRemove, ['configuration', 'response']); t.truthy(api.options.whitelist); }); diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index 84e288c33..f8d27a126 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -323,7 +323,7 @@ export default function parseArgs(argv: string[]): Args { statePropsToRemove: setArg( args.statePropsToRemove, WORKER_STATE_PROPS_TO_REMOVE, - ['configuration', 'webhookResponse', 'response'] + ['configuration', 'response'] ), runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500), payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10), diff --git a/packages/ws-worker/test/util/cli.test.ts b/packages/ws-worker/test/util/cli.test.ts index 4514babdf..698008dc8 100644 --- a/packages/ws-worker/test/util/cli.test.ts +++ b/packages/ws-worker/test/util/cli.test.ts @@ -60,11 +60,7 @@ test('cli should set default values for unspecified options', (t) => { t.is(args.capacity, 5); t.is(args.sentryEnv, 'dev'); t.falsy(args.sentryDsn); - t.deepEqual(args.statePropsToRemove, [ - 'configuration', - 'webhookResponse', - 'response', - ]); + t.deepEqual(args.statePropsToRemove, ['configuration', 'response']); t.is(args.runMemory, 500); t.is(args.maxRunDurationSeconds, 300); t.is(args.engineValidationRetries, 3); From 7b93e540462206806f5440371f1cdff873ccd0fb Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 08:04:48 +0000 Subject: [PATCH 11/17] feat: validate structure of webhook response body --- packages/ws-worker/src/events/step-complete.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 8dbc42796..c8200bc86 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -42,6 +42,19 @@ export default async function onStepComplete( state.inputDataclips[nextJobId] = dataclipId; }); + let webhook_response, + response_data = event.state?.webhookResponse; + + // validating structure of response + if ( + response_data && + Number.isInteger(response_data.status) && + response_data.body && + typeof response_data.body === 'object' + ) { + webhook_response = response_data; + } + const evt = { step_id, job_id, @@ -50,7 +63,7 @@ export default async function onStepComplete( duration: event.duration, thread_id: event.threadId, timestamp: timeInMicroseconds(event.time), - webhook_response: event.state?.webhookResponse || undefined, + webhook_response, } as StepCompletePayload; if (event.redacted) { From 4d66301756bd515df229b1008bab09f0eeb41a8a Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 08:07:43 +0000 Subject: [PATCH 12/17] revert: runtime test --- packages/runtime/test/execute/step.test.ts | 41 ---------------------- 1 file changed, 41 deletions(-) diff --git a/packages/runtime/test/execute/step.test.ts b/packages/runtime/test/execute/step.test.ts index 94c509cf7..b8517fb0e 100644 --- a/packages/runtime/test/execute/step.test.ts +++ b/packages/runtime/test/execute/step.test.ts @@ -440,47 +440,6 @@ test.serial( } ); -test.serial('webhookResponse is emitted on NOTIFY_JOB_COMPLETE', async (t) => { - const job = [ - async (s: State) => ({ - ...s, - webhookResponse: { status: 201, body: { ok: true } }, - }), - ]; - const step = { id: 'k', expression: job }; - - let webhook_response: any; - const notify = (event: string, payload: any) => { - if (event === NOTIFY_JOB_COMPLETE) { - webhook_response = JSON.parse(JSON.stringify(payload.webhook_response)); - } - }; - const context = createContext({ notify }); - - await execute(context, step, createState()); - t.deepEqual(webhook_response, { - status: 201, - body: { ok: true }, - }); -}); - -test.serial( - 'webhookResponse is removed from result state so it does not propagate to next step', - async (t) => { - const job = [ - async (s: State) => ({ - ...s, - webhookResponse: { status: 201, body: { ok: true } }, - }), - ]; - const step = { id: 'k', expression: job }; - const context = createContext(); - - const result = await execute(context, step, createState()); - t.false('webhookResponse' in (result.state as any)); - } -); - test.serial( 'no props are removed from state if an empty array is passed to statePropsToRemove', async (t) => { From 06221e00e13522961a1689677424fd85b8e60ca9 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 08:15:31 +0000 Subject: [PATCH 13/17] feat: allow response body with no status --- .../ws-worker/src/events/step-complete.ts | 10 +++++-- .../test/events/step-complete.test.ts | 30 ++++--------------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index c8200bc86..cf4f5036e 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -45,14 +45,18 @@ export default async function onStepComplete( let webhook_response, response_data = event.state?.webhookResponse; - // validating structure of response + // validating structure of response & add status if it's a number if ( response_data && - Number.isInteger(response_data.status) && response_data.body && typeof response_data.body === 'object' ) { - webhook_response = response_data; + webhook_response = { + ...(Number.isInteger(response_data.status) + ? { status: response_data.status } + : {}), + body: response_data.body, + }; } const evt = { diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 8b2b551b0..61b72e51a 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -412,7 +412,7 @@ test('omits webhook_response from payload when webhookResponse is not set on sta t.is(lightningEvent.webhook_response, undefined); }); -test('strips webhookResponse from the stored dataclip', async (t) => { +test('webhookResponse is included in dataclip', async (t) => { const plan = createPlan(); const jobId = 'job-1'; @@ -430,10 +430,10 @@ test('strips webhookResponse from the stored dataclip', async (t) => { await handleStepComplete({ channel, state } as any, event); const [dataclip] = Object.values(state.dataclips); - t.deepEqual(dataclip, { x: 10 }); + t.deepEqual(dataclip, { x: 10, webhookResponse: { status: 201, body: {} } }); }); -test('strips webhookResponse from the serialized output_dataclip sent to Lightning', async (t) => { +test('webhookResponse included in the serialized output_dataclip sent to Lightning', async (t) => { const plan = createPlan(); const jobId = 'job-1'; @@ -453,28 +453,10 @@ test('strips webhookResponse from the serialized output_dataclip sent to Lightni } as any; await handleStepComplete({ channel, state } as any, event); - t.deepEqual(JSON.parse(lightningEvent.output_dataclip), { x: 10 }); -}); - -test('strips webhookResponse from event.state so it does not flow downstream', async (t) => { - const plan = createPlan(); - const jobId = 'job-1'; - - const state = createRunState(plan); - state.activeJob = jobId; - state.activeStep = 'b'; - - const channel = mockChannel({ - [STEP_COMPLETE]: () => true, + t.deepEqual(JSON.parse(lightningEvent.output_dataclip), { + x: 10, + webhookResponse: { status: 201, body: {} }, }); - - const event = { - state: { x: 10, webhookResponse: { status: 201, body: {} } }, - next: ['job-2'], - } as any; - await handleStepComplete({ channel, state } as any, event); - - t.false('webhookResponse' in event.state); }); test('handles webhookResponse with only a body', async (t) => { From e453f6a39247ae98cb1f3395455de490f04efb43 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 09:12:50 +0000 Subject: [PATCH 14/17] chore: update changeset --- .changeset/humble-hats-call.md | 6 ++++++ .changeset/webhook-response-step-complete.md | 15 --------------- 2 files changed, 6 insertions(+), 15 deletions(-) create mode 100644 .changeset/humble-hats-call.md delete mode 100644 .changeset/webhook-response-step-complete.md diff --git a/.changeset/humble-hats-call.md b/.changeset/humble-hats-call.md new file mode 100644 index 000000000..eb0a772a9 --- /dev/null +++ b/.changeset/humble-hats-call.md @@ -0,0 +1,6 @@ +--- +'@openfn/ws-worker': minor +'@openfn/lexicon': minor +--- + +Support webhook_response in step:complete event when data available in results diff --git a/.changeset/webhook-response-step-complete.md b/.changeset/webhook-response-step-complete.md deleted file mode 100644 index a6ae173ba..000000000 --- a/.changeset/webhook-response-step-complete.md +++ /dev/null @@ -1,15 +0,0 @@ ---- -'@openfn/ws-worker': minor -'@openfn/runtime': minor -'@openfn/lexicon': minor ---- - -Forward `state.webhookResponse` to Lightning on `step:complete`. - -If a job sets `state.webhookResponse = { status, body }` (camelCase in job -code), the worker extracts it from user state and attaches it to the -`step:complete` payload as `webhook_response` (snake_case, matching the rest -of the wire payload). The key is stripped from the persisted dataclip -(worker) and from the next step's input state (runtime), so it cannot leak. -Lightning decides when to flush the HTTP response based on the trigger -configuration. From 467e8372d5eb98a390b2670d6c6a7951b62f4363 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 13 May 2026 11:47:49 +0100 Subject: [PATCH 15/17] more tests --- .../ws-worker/src/events/step-complete.ts | 26 ++----- .../test/events/step-complete.test.ts | 73 +++++++++++++++++++ 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index cf4f5036e..88eaa15da 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -42,23 +42,6 @@ export default async function onStepComplete( state.inputDataclips[nextJobId] = dataclipId; }); - let webhook_response, - response_data = event.state?.webhookResponse; - - // validating structure of response & add status if it's a number - if ( - response_data && - response_data.body && - typeof response_data.body === 'object' - ) { - webhook_response = { - ...(Number.isInteger(response_data.status) - ? { status: response_data.status } - : {}), - body: response_data.body, - }; - } - const evt = { step_id, job_id, @@ -67,9 +50,16 @@ export default async function onStepComplete( duration: event.duration, thread_id: event.threadId, timestamp: timeInMicroseconds(event.time), - webhook_response, } as StepCompletePayload; + // Feed through the webhook response if it's on state + // We do this on the event so that Lightning + // doesn't have the parse the dataclip + // (which may not be sent in zero persistence mode!) + if (outputState.webhookResponse) { + evt.webhook_response = outputState.webhookResponse; + } + if (event.redacted) { state.withheldDataclips[dataclipId] = true; evt.output_dataclip_error = 'DATACLIP_TOO_LARGE'; diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 61b72e51a..a80e7a217 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -391,6 +391,79 @@ test('includes webhook_response in step:complete payload when webhookResponse is }); }); +test('includes webhook_response in step:complete payload when webhookResponse is set on state (status only)', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201 } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + status: 201, + }); +}); + +test('includes webhook_response in step:complete payload when webhookResponse is set on state (body only)', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { body: { ok: true } } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + body: { ok: true }, + }); +}); + +test('includes webhook_response in step:complete payload when webhookResponse is set on state (no keys)', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: {} }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, {}); +}); + test('omits webhook_response from payload when webhookResponse is not set on state', async (t) => { const plan = createPlan(); const jobId = 'job-1'; From e2cb053df48b77ae5ebf8fc97e7656b5648e35b6 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 13 May 2026 10:48:06 +0000 Subject: [PATCH 16/17] refactor: simplify condition --- packages/ws-worker/src/events/step-complete.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index cf4f5036e..1b6927863 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -46,11 +46,7 @@ export default async function onStepComplete( response_data = event.state?.webhookResponse; // validating structure of response & add status if it's a number - if ( - response_data && - response_data.body && - typeof response_data.body === 'object' - ) { + if (typeof response_data?.body === 'object') { webhook_response = { ...(Number.isInteger(response_data.status) ? { status: response_data.status } From 3f1699be77cc82e8ec2ec13a653c5e0fd1a5d12f Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 13 May 2026 11:48:22 +0100 Subject: [PATCH 17/17] version: worker@1.25.0 --- .changeset/humble-hats-call.md | 6 ------ packages/lexicon/CHANGELOG.md | 6 ++++++ packages/lexicon/package.json | 2 +- packages/ws-worker/CHANGELOG.md | 11 +++++++++++ packages/ws-worker/package.json | 2 +- 5 files changed, 19 insertions(+), 8 deletions(-) delete mode 100644 .changeset/humble-hats-call.md diff --git a/.changeset/humble-hats-call.md b/.changeset/humble-hats-call.md deleted file mode 100644 index eb0a772a9..000000000 --- a/.changeset/humble-hats-call.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -'@openfn/ws-worker': minor -'@openfn/lexicon': minor ---- - -Support webhook_response in step:complete event when data available in results diff --git a/packages/lexicon/CHANGELOG.md b/packages/lexicon/CHANGELOG.md index 83eda5f90..b57d29713 100644 --- a/packages/lexicon/CHANGELOG.md +++ b/packages/lexicon/CHANGELOG.md @@ -1,5 +1,11 @@ # lexicon +## 2.1.0 + +### Minor Changes + +- e453f6a: Support webhook_response in step:complete event when data available in results + ## 2.0.0 ### Major Changes diff --git a/packages/lexicon/package.json b/packages/lexicon/package.json index 3b51706dd..584542294 100644 --- a/packages/lexicon/package.json +++ b/packages/lexicon/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lexicon", - "version": "2.0.0", + "version": "2.1.0", "description": "Central repo of names and type definitions", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 6bae7c6ab..74ec4d793 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,16 @@ # ws-worker +## 1.25.0 + +### Minor Changes + +- e453f6a: Support webhook_response in step:complete event when data available in results + +### Patch Changes + +- Updated dependencies [e453f6a] + - @openfn/lexicon@2.1.0 + ## 1.24.2 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 53f331221..8d47201c2 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.24.2", + "version": "1.25.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",