diff --git a/packages/lexicon/CHANGELOG.md b/packages/lexicon/CHANGELOG.md index 83eda5f90..b57d29713 100644 --- a/packages/lexicon/CHANGELOG.md +++ b/packages/lexicon/CHANGELOG.md @@ -1,5 +1,11 @@ # lexicon +## 2.1.0 + +### Minor Changes + +- e453f6a: Support webhook_response in step:complete event when data available in results + ## 2.0.0 ### Major Changes diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index 81d9ac5d3..bab1bdc9a 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -209,6 +209,11 @@ export type StepStartPayload = { }; export type StepStartReply = void; +export type WebhookResponse = { + status?: number; + body?: Record | any[]; +}; + export type StepCompletePayload = ExitReason & { run_id?: string; job_id: string; @@ -223,6 +228,7 @@ export type StepCompletePayload = ExitReason & { }; duration: number; timestamp: TimeInMicroSeconds; + webhook_response?: WebhookResponse; }; export type StepCompleteReply = void; diff --git a/packages/lexicon/package.json b/packages/lexicon/package.json index 3b51706dd..584542294 100644 --- a/packages/lexicon/package.json +++ b/packages/lexicon/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lexicon", - "version": "2.0.0", + "version": "2.1.0", "description": "Central repo of names and type definitions", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 6bae7c6ab..74ec4d793 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,16 @@ # ws-worker +## 1.25.0 + +### Minor Changes + +- e453f6a: Support webhook_response in step:complete event when data available in results + +### Patch Changes + +- Updated dependencies [e453f6a] + - @openfn/lexicon@2.1.0 + ## 1.24.2 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 53f331221..8d47201c2 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.24.2", + "version": "1.25.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 4c0640f87..88eaa15da 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -52,6 +52,14 @@ export default async function onStepComplete( timestamp: timeInMicroseconds(event.time), } as StepCompletePayload; + // Feed through the webhook response if it's on state + // We do this on the event so that Lightning + // doesn't have the parse the dataclip + // (which may not be sent in zero persistence mode!) + if (outputState.webhookResponse) { + evt.webhook_response = outputState.webhookResponse; + } + if (event.redacted) { state.withheldDataclips[dataclipId] = true; evt.output_dataclip_error = 'DATACLIP_TOO_LARGE'; diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 7a74f872d..a80e7a217 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -365,6 +365,262 @@ test('accumulate multiple leaf dataclips for branching workflow', async (t) => { t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]); }); +test('includes webhook_response in step:complete payload when webhookResponse is set on state', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: { ok: true } } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + status: 201, + body: { ok: true }, + }); +}); + +test('includes webhook_response in step:complete payload when webhookResponse is set on state (status only)', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201 } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + status: 201, + }); +}); + +test('includes webhook_response in step:complete payload when webhookResponse is set on state (body only)', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { body: { ok: true } } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + body: { ok: true }, + }); +}); + +test('includes webhook_response in step:complete payload when webhookResponse is set on state (no keys)', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: {} }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, {}); +}); + +test('omits webhook_response from payload when webhookResponse is not set on state', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { state: { x: 10 } } as any; + await handleStepComplete({ channel, state } as any, event); + + t.is(lightningEvent.webhook_response, undefined); +}); + +test('webhookResponse is included in dataclip', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: {} } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + const [dataclip] = Object.values(state.dataclips); + t.deepEqual(dataclip, { x: 10, webhookResponse: { status: 201, body: {} } }); +}); + +test('webhookResponse included in the serialized output_dataclip sent to Lightning', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { x: 10, webhookResponse: { status: 201, body: {} } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(JSON.parse(lightningEvent.output_dataclip), { + x: 10, + webhookResponse: { status: 201, body: {} }, + }); +}); + +test('handles webhookResponse with only a body', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { webhookResponse: { body: { message: 'hello' } } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { body: { message: 'hello' } }); +}); + +test('handles webhookResponse body as a JSON array', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const event = { + state: { webhookResponse: { status: 200, body: [{ id: 1 }, { id: 2 }] } }, + } as any; + await handleStepComplete({ channel, state } as any, event); + + t.deepEqual(lightningEvent.webhook_response, { + status: 200, + body: [{ id: 1 }, { id: 2 }], + }); +}); + +test('does nothing with webhookResponse when event.state is empty', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + await handleStepComplete({ channel, state } as any, { state: {} } as any); + t.is(lightningEvent.webhook_response, undefined); +}); + +test('does nothing with webhookResponse when event.state is undefined', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + let lightningEvent: any; + const channel = mockChannel({ + [STEP_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + await t.notThrowsAsync(() => + handleStepComplete({ channel, state } as any, {} as any) + ); + t.is(lightningEvent.webhook_response, undefined); +}); + // Single leaf reached by two paths: start → a → x, start → b → x // x executes twice, both times with no downstream test('accumulate two leaf dataclips when same node reached by two paths', async (t) => {