Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .changeset/webhook-response-step-complete.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
'@openfn/ws-worker': minor
'@openfn/runtime': minor
'@openfn/lexicon': minor
---

Forward `state.webhookResponse` to Lightning on `step:complete`.

If a job sets `state.webhookResponse = { status, body }` (camelCase in job
code), the worker extracts it from user state and attaches it to the
`step:complete` payload as `webhook_response` (snake_case, matching the rest
of the wire payload). The key is stripped from the persisted dataclip
(worker) and from the next step's input state (runtime), so it cannot leak.
Lightning decides when to flush the HTTP response based on the trigger
configuration.
1 change: 1 addition & 0 deletions packages/engine-multi/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const createAPI = async function (

statePropsToRemove: options.statePropsToRemove ?? [
'configuration',
'webhookResponse',
'response',
],
profile: options.profile,
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export interface JobCompletePayload extends ExternalEvent {
job: number;
system: number;
};
webhook_response?: any;
}

export interface JobErrorPayload extends ExternalEvent {
Expand Down
6 changes: 5 additions & 1 deletion packages/engine-multi/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ test.serial('engine api uses default options', async (t) => {

t.truthy(api.options);

t.deepEqual(api.options.statePropsToRemove, ['configuration', 'response']);
t.deepEqual(api.options.statePropsToRemove, [
'configuration',
'webhookResponse',
'response',
]);
t.truthy(api.options.whitelist);
});

Expand Down
6 changes: 6 additions & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ export type StepStartPayload = {
};
export type StepStartReply = void;

export type WebhookResponse = {
status?: number;
body?: Record<string, any> | any[];
};

export type StepCompletePayload = ExitReason & {
run_id?: string;
job_id: string;
Expand All @@ -223,6 +228,7 @@ export type StepCompletePayload = ExitReason & {
};
duration: number;
timestamp: TimeInMicroSeconds;
webhook_response?: WebhookResponse;
};
export type StepCompleteReply = void;

Expand Down
4 changes: 3 additions & 1 deletion packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ const prepareFinalState = async (
if (!statePropsToRemove) {
// As a strict default, remove the configuration key
// tbh this should happen higher up in the stack but it causes havoc in unit testing
statePropsToRemove = ['configuration'];
statePropsToRemove = ['configuration', 'webhookResponse'];
}

const removedProps: string[] = [];
Expand Down Expand Up @@ -235,6 +235,7 @@ const executeStep = async (
if (!didError) {
const humanDuration = logger.timer(timerId);
logger.success(`${jobName} completed in ${humanDuration}`);
const webhook_response = result?.webhookResponse || undefined;
result = await prepareFinalState(
result,
logger,
Expand Down Expand Up @@ -275,6 +276,7 @@ const executeStep = async (
jobId,
next,
mem,
webhook_response,
});
}
} else {
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ export type NotifyJobCompletePayload = {
system: number;
peak?: number;
};
webhook_response?: any;
};

export type NotifyJobErrorPayload = {
Expand Down
41 changes: 41 additions & 0 deletions packages/runtime/test/execute/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,47 @@ test.serial(
}
);

test.serial('webhookResponse is emitted on NOTIFY_JOB_COMPLETE', async (t) => {
const job = [
async (s: State) => ({
...s,
webhookResponse: { status: 201, body: { ok: true } },
}),
];
const step = { id: 'k', expression: job };

let webhook_response: any;
const notify = (event: string, payload: any) => {
if (event === NOTIFY_JOB_COMPLETE) {
webhook_response = JSON.parse(JSON.stringify(payload.webhook_response));
}
};
const context = createContext({ notify });

await execute(context, step, createState());
t.deepEqual(webhook_response, {
status: 201,
body: { ok: true },
});
});

test.serial(
'webhookResponse is removed from result state so it does not propagate to next step',
async (t) => {
const job = [
async (s: State) => ({
...s,
webhookResponse: { status: 201, body: { ok: true } },
}),
];
const step = { id: 'k', expression: job };
const context = createContext();

const result = await execute(context, step, createState());
t.false('webhookResponse' in (result.state as any));
}
);

test.serial(
'no props are removed from state if an empty array is passed to statePropsToRemove',
async (t) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export default async function onStepComplete(
if (!state.dataclips) {
state.dataclips = {};
}

const outputState = event.state || {};

state.dataclips[dataclipId] = event.state;
Expand All @@ -50,6 +51,7 @@ export default async function onStepComplete(
duration: event.duration,
thread_id: event.threadId,
timestamp: timeInMicroseconds(event.time),
webhook_response: event.webhook_response,
} as StepCompletePayload;

if (event.redacted) {
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ export default function parseArgs(argv: string[]): Args {
statePropsToRemove: setArg(
args.statePropsToRemove,
WORKER_STATE_PROPS_TO_REMOVE,
['configuration', 'response']
['configuration', 'webhookResponse', 'response']
),
runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500),
payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10),
Expand Down
201 changes: 201 additions & 0 deletions packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,207 @@ test('accumulate multiple leaf dataclips for branching workflow', async (t) => {
t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]);
});

test('includes webhook_response in step:complete payload when webhookResponse is set on state', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: { ok: true } } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {
status: 201,
body: { ok: true },
});
});

test('omits webhook_response from payload when webhookResponse is not set on state', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = { state: { x: 10 } } as any;
await handleStepComplete({ channel, state } as any, event);

t.is(lightningEvent.webhook_response, undefined);
});

test('strips webhookResponse from the stored dataclip', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const channel = mockChannel({
[STEP_COMPLETE]: () => true,
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: {} } },
} as any;
await handleStepComplete({ channel, state } as any, event);

const [dataclip] = Object.values(state.dataclips);
t.deepEqual(dataclip, { x: 10 });
});

test('strips webhookResponse from the serialized output_dataclip sent to Lightning', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: {} } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(JSON.parse(lightningEvent.output_dataclip), { x: 10 });
});

test('strips webhookResponse from event.state so it does not flow downstream', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const channel = mockChannel({
[STEP_COMPLETE]: () => true,
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: {} } },
next: ['job-2'],
} as any;
await handleStepComplete({ channel, state } as any, event);

t.false('webhookResponse' in event.state);
});

test('handles webhookResponse with only a body', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { webhookResponse: { body: { message: 'hello' } } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, { body: { message: 'hello' } });
});

test('handles webhookResponse body as a JSON array', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { webhookResponse: { status: 200, body: [{ id: 1 }, { id: 2 }] } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {
status: 200,
body: [{ id: 1 }, { id: 2 }],
});
});

test('does nothing with webhookResponse when event.state is empty', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

await handleStepComplete({ channel, state } as any, { state: {} } as any);
t.is(lightningEvent.webhook_response, undefined);
});

test('does nothing with webhookResponse when event.state is undefined', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

await t.notThrowsAsync(() =>
handleStepComplete({ channel, state } as any, {} as any)
);
t.is(lightningEvent.webhook_response, undefined);
});

// Single leaf reached by two paths: start → a → x, start → b → x
// x executes twice, both times with no downstream
test('accumulate two leaf dataclips when same node reached by two paths', async (t) => {
Expand Down
6 changes: 5 additions & 1 deletion packages/ws-worker/test/util/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ test('cli should set default values for unspecified options', (t) => {
t.is(args.capacity, 5);
t.is(args.sentryEnv, 'dev');
t.falsy(args.sentryDsn);
t.deepEqual(args.statePropsToRemove, ['configuration', 'response']);
t.deepEqual(args.statePropsToRemove, [
'configuration',
'webhookResponse',
'response',
]);
t.is(args.runMemory, 500);
t.is(args.maxRunDurationSeconds, 300);
t.is(args.engineValidationRetries, 3);
Expand Down