From 92b09932207c496a380aa69efa5f85a8a101531e Mon Sep 17 00:00:00 2001
From: Frank Midigo
Date: Wed, 22 Apr 2026 08:25:36 +0300
Subject: [PATCH 01/21] PoC for webhook responses
---
lib/lightning/workflows/trigger.ex | 2 +
lib/lightning_web/channels/run_channel.ex | 49 +++++++++++++++--------
2 files changed, 35 insertions(+), 16 deletions(-)
diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex
index c6aa449e350..60b7b79362e 100644
--- a/lib/lightning/workflows/trigger.ex
+++ b/lib/lightning/workflows/trigger.ex
@@ -55,6 +55,8 @@ defmodule Lightning.Workflows.Trigger do
field :delete, :boolean, virtual: true
field :has_auth_method, :boolean, virtual: true
+ field :webhook_response_success_code, :integer, virtual: true, default: 200
+ field :webhook_response_error_code, :integer, virtual: true, default: 400
many_to_many :webhook_auth_methods, Lightning.Workflows.WebhookAuthMethod,
join_through: "trigger_webhook_auth_methods",
diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex
index 4ff9d65ceef..c7df66b0d04 100644
--- a/lib/lightning_web/channels/run_channel.ex
+++ b/lib/lightning_web/channels/run_channel.ex
@@ -210,6 +210,7 @@ defmodule LightningWeb.RunChannel do
reply_with(socket, {:error, changeset})
{:ok, step} ->
+ maybe_broadcast_custom_webhook_response(socket.assigns.run, payload)
reply_with(socket, {:ok, %{step_id: step.id}})
end
end
@@ -306,6 +307,32 @@ defmodule LightningWeb.RunChannel do
# Ignore other messages
def handle_info(_msg, socket), do: {:noreply, socket}
+ defp maybe_broadcast_custom_webhook_response(run, payload) do
+ case payload["webhook_response"] do
+ nil ->
+ :ok
+
+ webhook_response ->
+ run_with_preloads = Repo.preload(run, work_order: [:trigger])
+ trigger = run_with_preloads.work_order.trigger
+
+ if trigger && trigger.type == :webhook &&
+ trigger.webhook_reply == :custom do
+ topic =
+ "work_order:#{run_with_preloads.work_order.id}:webhook_response"
+
+ status_code = webhook_response["code"] || 200
+ body = webhook_response["body"] || %{}
+
+ Phoenix.PubSub.broadcast(
+ Lightning.PubSub,
+ topic,
+ {:webhook_response, status_code, body}
+ )
+ end
+ end
+ end
+
defp maybe_broadcast_webhook_response(run, payload) do
work_order = run.work_order
trigger = work_order.trigger
@@ -313,10 +340,7 @@ defmodule LightningWeb.RunChannel do
if trigger && trigger.type == :webhook &&
trigger.webhook_reply == :after_completion do
topic = "work_order:#{work_order.id}:webhook_response"
-
- # TODO - Later allow workflow authors to customize the status code
- # and body of the reply.
- status_code = determine_status_code(run.state)
+ status_code = determine_status_code(run.state, trigger)
body = %{
data: payload["final_state"],
@@ -340,18 +364,11 @@ defmodule LightningWeb.RunChannel do
end
end
- # TODO - decide how we should respond... do we use HTTP codes for run states?
- defp determine_status_code(state) do
- case state do
- :success -> 201
- :failed -> 201
- :crashed -> 201
- :exception -> 201
- :killed -> 201
- :cancelled -> 201
- _other -> 201
- end
- end
+ defp determine_status_code(:success, trigger),
+ do: trigger.webhook_response_success_code
+
+ defp determine_status_code(_state, trigger),
+ do: trigger.webhook_response_error_code
defp update_scrubber(nil, samples, basic_auth) do
Scrubber.start_link(samples: samples, basic_auth: basic_auth)
From 53b09e6755e98c50523afd5882e0f903aeae0917 Mon Sep 17 00:00:00 2001
From: Frank Midigo
Date: Wed, 22 Apr 2026 08:59:59 +0300
Subject: [PATCH 02/21] Allow users to change status codes from UI
---
.../adapters/YAMLStateToYDoc.ts | 8 +
.../components/inspector/TriggerForm.tsx | 139 +++++++++++++++++-
.../js/collaborative-editor/types/session.ts | 4 +-
.../js/collaborative-editor/types/trigger.ts | 18 ++-
assets/js/yaml/types.ts | 2 +
.../collaboration/workflow_serializer.ex | 22 ++-
lib/lightning/workflows/trigger.ex | 27 +++-
lib/lightning_web/channels/run_channel.ex | 4 +-
...add_webhook_response_codes_to_triggers.exs | 10 ++
9 files changed, 223 insertions(+), 11 deletions(-)
create mode 100644 priv/repo/migrations/20260422052858_add_webhook_response_codes_to_triggers.exs
diff --git a/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts b/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts
index 1ea725db71d..f3f946f62f4 100644
--- a/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts
+++ b/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts
@@ -77,6 +77,14 @@ export class YAMLStateToYDoc {
if (trigger.type === 'webhook') {
triggerMap.set('webhook_reply', trigger.webhook_reply ?? null);
+ triggerMap.set(
+ 'webhook_response_success_code',
+ trigger.webhook_response_success_code ?? null
+ );
+ triggerMap.set(
+ 'webhook_response_error_code',
+ trigger.webhook_response_error_code ?? null
+ );
}
return triggerMap;
diff --git a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx
index a1be264f046..01e336f8527 100644
--- a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx
+++ b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx
@@ -398,6 +398,7 @@ export function TriggerForm({ trigger }: TriggerFormProps) {
e.target.value as
| 'before_start'
| 'after_completion'
+ | 'custom'
)
}
onBlur={field.handleBlur}
@@ -418,14 +419,17 @@ export function TriggerForm({ trigger }: TriggerFormProps) {
Async (default)
+
{field.state.value === 'after_completion'
- ? 'Responds with the final output state after the run completes. (Note that depending on your queue size and the duration of the workflow itself, this could take a long time.)'
- : 'Responds immediately with the enqueued work order ID.'}
+ ? 'Responds with the final output state after the run completes. You can customise the HTTP status codes returned below.'
+ : field.state.value === 'custom'
+ ? 'The job code itself sends the HTTP response using the send() function.'
+ : 'Responds immediately with the enqueued work order ID.'}
{field.state.value === 'after_completion'
- ? 'Responds with the final output state after the run completes. You can customise the HTTP status codes returned below.'
- : field.state.value === 'custom'
- ? 'The job code itself sends the HTTP response using the send() function.'
- : 'Responds immediately with the enqueued work order ID.'}
+ ? 'Holds the HTTP connection open and responds when the run completes. Jobs can send an early response using setWebhookResponse().'
+ : 'Responds immediately with the enqueued work order ID.'}
{field.state.meta.errors.map(error => (
- {/* Conditional response code fields for after_completion */}
+ {/* Conditional config fields for after_completion */}
- {/* Response Body (JSON) */}
+ {/* Response Body */}
-
- Response body (JSON)
+
+ Response body
-
+
+ {/* Runtime override note */}
+
+ To override both the status code and body
+ at runtime, write to{' '}
+
+ _webhookResponse
+ {' '}
+ in the job state. When present, it takes
+ full priority over these defaults.
+
);
}}
diff --git a/assets/js/collaborative-editor/types/session.ts b/assets/js/collaborative-editor/types/session.ts
index a4772440576..f43ce4b8a15 100644
--- a/assets/js/collaborative-editor/types/session.ts
+++ b/assets/js/collaborative-editor/types/session.ts
@@ -72,7 +72,8 @@ export namespace Session {
has_auth_method: boolean;
webhook_reply: 'before_start' | 'after_completion' | null;
sync_webhook_response_config: {
- code: number | null;
+ success_code: number | null;
+ error_code: number | null;
body: Record | null;
} | null;
webhook_auth_methods: Array<{
diff --git a/assets/js/collaborative-editor/types/trigger.ts b/assets/js/collaborative-editor/types/trigger.ts
index 69db4035961..253d9f705c0 100644
--- a/assets/js/collaborative-editor/types/trigger.ts
+++ b/assets/js/collaborative-editor/types/trigger.ts
@@ -26,7 +26,8 @@ const webhookTriggerSchema = baseTriggerSchema.extend({
.default('before_start'),
sync_webhook_response_config: z
.object({
- code: z.number().int().nullable().default(null),
+ success_code: z.number().int().nullable().default(null),
+ error_code: z.number().int().nullable().default(null),
body: z.record(z.string(), z.unknown()).nullable().default(null),
})
.nullable()
diff --git a/assets/js/yaml/types.ts b/assets/js/yaml/types.ts
index 90b4eef9e13..4804aaced89 100644
--- a/assets/js/yaml/types.ts
+++ b/assets/js/yaml/types.ts
@@ -30,7 +30,8 @@ export type StateWebhookTrigger = {
type: 'webhook';
webhook_reply: 'before_start' | 'after_completion' | null | undefined;
sync_webhook_response_config?: {
- code?: number | null;
+ success_code?: number | null;
+ error_code?: number | null;
body?: Record | null;
} | null;
};
diff --git a/lib/lightning/collaboration/workflow_serializer.ex b/lib/lightning/collaboration/workflow_serializer.ex
index c7ed9790971..963bfb0b580 100644
--- a/lib/lightning/collaboration/workflow_serializer.ex
+++ b/lib/lightning/collaboration/workflow_serializer.ex
@@ -236,7 +236,8 @@ defmodule Lightning.Collaboration.WorkflowSerializer do
config ->
Yex.MapPrelim.from(%{
- "code" => config.code,
+ "success_code" => config.success_code,
+ "error_code" => config.error_code,
"body" => config.body
})
end
@@ -312,24 +313,27 @@ defmodule Lightning.Collaboration.WorkflowSerializer do
defp normalize_kafka_configuration(trigger), do: trigger
- # Y.Doc serialises numbers as floats; convert code back to an integer.
+ # Y.Doc serialises numbers as floats; convert integer codes back.
defp normalize_sync_webhook_response_config(
%{"sync_webhook_response_config" => %{} = config} = trigger
) do
normalized =
- case Map.fetch(config, "code") do
- {:ok, value} when is_float(value) ->
- Map.put(config, "code", trunc(value))
-
- _ ->
- config
- end
+ config
+ |> normalize_integer_field("success_code")
+ |> normalize_integer_field("error_code")
Map.put(trigger, "sync_webhook_response_config", normalized)
end
defp normalize_sync_webhook_response_config(trigger), do: trigger
+ defp normalize_integer_field(map, key) do
+ case Map.fetch(map, key) do
+ {:ok, value} when is_float(value) -> Map.put(map, key, trunc(value))
+ _ -> map
+ end
+ end
+
defp extract_positions(positions_map) do
Yex.Map.to_json(positions_map)
end
diff --git a/lib/lightning/workflows/snapshot.ex b/lib/lightning/workflows/snapshot.ex
index 468b9b2196d..bb2982c62ac 100644
--- a/lib/lightning/workflows/snapshot.ex
+++ b/lib/lightning/workflows/snapshot.ex
@@ -13,6 +13,7 @@ defmodule Lightning.Workflows.Snapshot do
alias Lightning.Credentials.KeychainCredential
alias Lightning.Projects.ProjectCredential
alias Lightning.Repo
+ alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig
alias Lightning.Workflows.WebhookAuthMethod
alias Lightning.Workflows.Workflow
@@ -61,11 +62,12 @@ defmodule Lightning.Workflows.Snapshot do
field :has_auth_method, :boolean, virtual: true
field :webhook_reply, Ecto.Enum,
- values: [:before_start, :after_completion, :custom],
+ values: [:before_start, :after_completion],
default: :before_start
- field :webhook_response_success_code, :integer
- field :webhook_response_error_code, :integer
+ embeds_one :sync_webhook_response_config, SyncWebhookResponseConfig,
+ on_replace: :update,
+ primary_key: false
many_to_many :webhook_auth_methods, WebhookAuthMethod,
join_through: "trigger_webhook_auth_methods",
@@ -127,6 +129,10 @@ defmodule Lightning.Workflows.Snapshot do
schema
|> cast(params, @trigger_fields)
|> validate_required([:id, :inserted_at, :updated_at])
+ |> cast_embed(:sync_webhook_response_config,
+ required: false,
+ with: &SyncWebhookResponseConfig.changeset/2
+ )
end
defp edge_changeset(schema, params) do
@@ -135,7 +141,12 @@ defmodule Lightning.Workflows.Snapshot do
|> validate_required([:id, :inserted_at, :updated_at])
end
- @associations_to_include [:jobs, :triggers, :edges]
+ @associations_to_include [
+ :jobs,
+ :triggers,
+ :edges,
+ :sync_webhook_response_config
+ ]
@spec build(Workflow.t()) :: Ecto.Changeset.t()
def build(%Workflow{} = workflow) do
diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex
index 1f13cdb8f2b..43a6713216c 100644
--- a/lib/lightning/workflows/trigger.ex
+++ b/lib/lightning/workflows/trigger.ex
@@ -64,7 +64,8 @@ defmodule Lightning.Workflows.Trigger do
embeds_one :kafka_configuration, KafkaConfiguration, on_replace: :update
embeds_one :sync_webhook_response_config, SyncWebhookResponseConfig,
- on_replace: :update
+ on_replace: :update,
+ primary_key: false
timestamps()
end
@@ -168,8 +169,6 @@ defmodule Lightning.Workflows.Trigger do
end
end
- # Keep the config when reply mode is after_completion; clear it otherwise
- # so stale config doesn't persist across mode switches.
defp maybe_clear_sync_config(changeset) do
case fetch_field!(changeset, :webhook_reply) do
:after_completion -> changeset
diff --git a/lib/lightning/workflows/triggers/sync_webhook_response_config.ex b/lib/lightning/workflows/triggers/sync_webhook_response_config.ex
index a7c3506b5c2..1baacb8659a 100644
--- a/lib/lightning/workflows/triggers/sync_webhook_response_config.ex
+++ b/lib/lightning/workflows/triggers/sync_webhook_response_config.ex
@@ -5,9 +5,11 @@ defmodule Lightning.Workflows.Triggers.SyncWebhookResponseConfig do
When a trigger's `webhook_reply` is `:after_completion`, this config controls
the HTTP response returned to the caller:
- - `code` — HTTP status code. Falls back to 200 (success) or 400 (any other
- terminal state) when nil.
- - `body` — Custom response body. Falls back to the run's final state when nil.
+ - `success_code` — HTTP status code when the run succeeds. Defaults to 201.
+ - `error_code` — HTTP status code for any non-success terminal state
+ (failed, crashed, exception, killed, cancelled). Defaults to 201.
+ - `body` — Custom response body (JSON). Falls back to the run's final state
+ when nil.
"""
use Ecto.Schema
@@ -15,11 +17,12 @@ defmodule Lightning.Workflows.Triggers.SyncWebhookResponseConfig do
@primary_key false
embedded_schema do
- field :code, :integer
+ field :success_code, :integer
+ field :error_code, :integer
field :body, :map
end
def changeset(config, attrs) do
- cast(config, attrs, [:code, :body])
+ cast(config, attrs, [:success_code, :error_code, :body])
end
end
diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex
index d76dddee5e4..a759fff7c18 100644
--- a/lib/lightning_web/channels/run_channel.ex
+++ b/lib/lightning_web/channels/run_channel.ex
@@ -121,7 +121,10 @@ defmodule LightningWeb.RunChannel do
|> Lightning.FailureAlerter.alert_on_failure()
# Broadcast webhook response for after_completion mode
- maybe_broadcast_webhook_response(run_with_preloads, payload)
+ maybe_broadcast_webhook_response(
+ run_with_preloads,
+ payload["final_state"]
+ )
socket |> assign(run: run) |> reply_with({:ok, nil})
@@ -306,63 +309,95 @@ defmodule LightningWeb.RunChannel do
# Ignore other messages
def handle_info(_msg, socket), do: {:noreply, socket}
- defp maybe_broadcast_webhook_response(run, payload) do
- work_order = run.work_order
- trigger = work_order.trigger
-
- if trigger && trigger.type == :webhook &&
- trigger.webhook_reply == :after_completion do
- topic = "work_order:#{work_order.id}:webhook_response"
- final_state = payload["final_state"]
- config = trigger.sync_webhook_response_config
-
- # _webhookResponse in the final state takes priority over the trigger config.
- state_override =
- case final_state do
- %{"_webhookResponse" => override} when is_map(override) -> override
- _ -> nil
- end
-
- status_code =
- state_override_code(state_override) ||
- (config && config.code) ||
- default_status_code(run.state)
-
- body =
- Map.get(state_override || %{}, "body") ||
- (config && config.body) ||
- %{
- data: final_state,
- meta: %{
- work_order_id: work_order.id,
- run_id: run.id,
- state: run.state,
- error_type: run.error_type,
- inserted_at: run.inserted_at,
- started_at: run.started_at,
- claimed_at: run.claimed_at,
- finished_at: run.finished_at
- }
- }
+ defp maybe_broadcast_webhook_response(
+ run,
+ final_state
+ ) do
+ if run.work_order.trigger.webhook_reply == :after_completion do
+ {status_code, body} =
+ determine_webhook_response(
+ run,
+ final_state,
+ run.work_order.trigger.sync_webhook_response_config
+ )
Phoenix.PubSub.broadcast(
Lightning.PubSub,
- topic,
+ "work_order:#{run.work_order_id}:webhook_response",
{:webhook_response, status_code, body}
)
end
end
- # Normalise the code from _webhookResponse — JSON numbers arrive as floats.
- defp state_override_code(%{"code" => code}) when is_integer(code), do: code
+ defp determine_webhook_response(run, final_state, config) do
+ (final_state || %{})
+ |> Map.get("_webhookResponse")
+ |> case do
+ resp when is_nil(resp) or resp == %{} ->
+ build_default_response(run, final_state, config)
- defp state_override_code(%{"code" => code}) when is_float(code),
- do: trunc(code)
+ resp ->
+ build_response_from_state(resp)
+ end
+ end
- defp state_override_code(_), do: nil
+ defp build_default_response(run, final_state, config) do
+ {default_response_status(run.state, config),
+ default_response_body(run.state, final_state, config)}
+ end
+
+ defp default_response_status(:success, %{success_code: code})
+ when is_integer(code),
+ do: code
+
+ defp default_response_status(_run_status, %{error_code: code})
+ when is_integer(code),
+ do: code
+
+ defp default_response_status(_run_status, _config), do: 201
- defp default_status_code(:success), do: 200
- defp default_status_code(_), do: 400
+ defp default_response_body(_run_status, _final_state, %{body: body})
+ when is_map(body),
+ do: body
+
+ defp default_response_body(:success, final_state, %{body: nil}),
+ do: final_state
+
+ defp default_response_body(run_status, _final_state, _config) do
+ %{
+ message:
+ "Run completed with status: #{run_status}. As a security policy, OpenFn does not send state data when the run errors out to avoid leaking sensitive information"
+ }
+ end
+
+ defp build_response_from_state(%{"status" => status, "body" => body})
+ when is_map(body) and is_integer(status) do
+ {status, body}
+ end
+
+ defp build_response_from_state(%{"status" => status, "body" => _} = response)
+ when is_float(status) do
+ response
+ |> Map.put("status", trunc(status))
+ |> build_response_from_state()
+ end
+
+ defp build_response_from_state(%{"status" => status}) do
+ malformed_response("status needs to be an integer, got: #{inspect(status)}")
+ end
+
+ defp build_response_from_state(%{"body" => body}) do
+ malformed_response("body needs to be json object, got: #{inspect(body)}")
+ end
+
+ defp build_response_from_state(_webhook_response) do
+ malformed_response("no status or body was defined")
+ end
+
+ defp malformed_response(reason) do
+ {201,
+ %{message: "Run completed, but _webhookResponse was malformed: #{reason}"}}
+ end
defp update_scrubber(nil, samples, basic_auth) do
Scrubber.start_link(samples: samples, basic_auth: basic_auth)
diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex
index 1a5fc8e500b..bb624b87244 100644
--- a/lib/lightning_web/controllers/webhooks_controller.ex
+++ b/lib/lightning_web/controllers/webhooks_controller.ex
@@ -63,6 +63,19 @@ defmodule LightningWeb.WebhooksController do
)
|> case do
{:ok, work_order} ->
+ conn =
+ conn
+ |> put_resp_header("x-meta-work_order_id", work_order.id)
+ |> then(fn conn ->
+ case work_order do
+ %{runs: [run]} ->
+ put_resp_header(conn, "x-meta-run_id", run.id)
+
+ _ ->
+ conn
+ end
+ end)
+
if Workflows.Trigger.synchronous?(trigger) do
handle_delayed_response(conn, work_order)
else
@@ -131,11 +144,6 @@ defmodule LightningWeb.WebhooksController do
conn
|> put_status(status_code)
|> json(body)
-
- {:webhook_error, status_code, error} ->
- conn
- |> put_status(status_code)
- |> json(%{error: error})
after
Lightning.Config.webhook_response_timeout_ms() ->
Logger.warning(
From e356ad1014e652b210759c6c9cd8d60d85facab8 Mon Sep 17 00:00:00 2001
From: Frank Midigo
Date: Mon, 27 Apr 2026 07:18:12 +0300
Subject: [PATCH 06/21] add tests and fix bugs for configurable webhook
responses
- Fix snapshot serialization to recursively handle embedded schemas
using Ecto.embedded_dump + __schema__(:embeds), excluding
:sync_webhook_response_config from the flat cast field list
- Drop incorrect primary_key: false from sync_webhook_response_config embed
- Fix default_response_body/3 nil-config clause so success runs
without a configured body correctly return the final state
- Rename webhook response headers to hyphenated form:
x-meta-work-order-id, x-meta-run-id
- Add run_channel tests covering all webhook response dispatch paths:
_webhookResponse override, configured status codes/body, malformed
responses, before_start triggers
- Rewrite webhooks_controller delayed-response tests to use
WorkOrders.Events for synchronisation instead of Process.sleep,
and assert on x-meta-* response headers
- Fix integration tests that incorrectly asserted data/meta wrapping
on the response body; add integration tests for configured
success_code, error_code, custom body, and _webhookResponse override
---
lib/lightning/workflows/snapshot.ex | 48 ++-
lib/lightning/workflows/trigger.ex | 3 +-
lib/lightning_web/channels/run_channel.ex | 2 +-
.../controllers/webhooks_controller.ex | 4 +-
test/integration/web_and_worker_test.exs | 207 ++++++++--
.../channels/run_channel_test.exs | 356 ++++++++++++++++++
.../controllers/webhooks_controller_test.exs | 156 +++-----
7 files changed, 623 insertions(+), 153 deletions(-)
diff --git a/lib/lightning/workflows/snapshot.ex b/lib/lightning/workflows/snapshot.ex
index bb2982c62ac..09fab828648 100644
--- a/lib/lightning/workflows/snapshot.ex
+++ b/lib/lightning/workflows/snapshot.ex
@@ -66,8 +66,7 @@ defmodule Lightning.Workflows.Snapshot do
default: :before_start
embeds_one :sync_webhook_response_config, SyncWebhookResponseConfig,
- on_replace: :update,
- primary_key: false
+ on_replace: :update
many_to_many :webhook_auth_methods, WebhookAuthMethod,
join_through: "trigger_webhook_auth_methods",
@@ -116,7 +115,7 @@ defmodule Lightning.Workflows.Snapshot do
@job_fields Lightning.Workflows.Job.__schema__(:fields) -- [:workflow_id]
@trigger_fields Lightning.Workflows.Trigger.__schema__(:fields) --
- [:workflow_id]
+ [:workflow_id, :sync_webhook_response_config]
@edge_fields Lightning.Workflows.Edge.__schema__(:fields) -- [:workflow_id]
defp job_changeset(schema, params) do
@@ -141,12 +140,7 @@ defmodule Lightning.Workflows.Snapshot do
|> validate_required([:id, :inserted_at, :updated_at])
end
- @associations_to_include [
- :jobs,
- :triggers,
- :edges,
- :sync_webhook_response_config
- ]
+ @associations_to_include [:jobs, :triggers, :edges]
@spec build(Workflow.t()) :: Ecto.Changeset.t()
def build(%Workflow{} = workflow) do
@@ -156,7 +150,7 @@ defmodule Lightning.Workflows.Snapshot do
|> Enum.into(%{}, fn {field, value} ->
case field do
field when field in @associations_to_include ->
- {field, Enum.map(value, &Map.from_struct/1)}
+ {field, assoc_to_map(field, value)}
field when field in [:name, :lock_version, :positions] ->
{field, value}
@@ -171,6 +165,40 @@ defmodule Lightning.Workflows.Snapshot do
|> new()
end
+ defp assoc_to_map(field, structs) when is_list(structs) do
+ Enum.map(structs, &assoc_to_map(field, &1))
+ end
+
+ defp assoc_to_map(:triggers, %module{} = struct) do
+ base = Ecto.embedded_dump(struct, :json)
+
+ module.__schema__(:embeds)
+ |> Enum.reduce(base, fn field, acc ->
+ embed = Map.get(struct, field)
+ Map.put(acc, field, embed_to_map(embed))
+ end)
+ end
+
+ defp assoc_to_map(_, %_{} = struct) do
+ Ecto.embedded_dump(struct, :json)
+ end
+
+ defp embed_to_map(%module{} = struct) do
+ base = Ecto.embedded_dump(struct, :json)
+
+ module.__schema__(:embeds)
+ |> Enum.reduce(base, fn field, acc ->
+ embed = Map.get(struct, field)
+ Map.put(acc, field, embed_to_map(embed))
+ end)
+ end
+
+ defp embed_to_map(embeds) when is_list(embeds) do
+ Enum.map(embeds, &embed_to_map/1)
+ end
+
+ defp embed_to_map(nil), do: nil
+
@spec create(Workflow.t()) ::
{:ok, t()} | {:error, Ecto.Changeset.t()}
def create(%Workflow{} = workflow) do
diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex
index 43a6713216c..35662429cf4 100644
--- a/lib/lightning/workflows/trigger.ex
+++ b/lib/lightning/workflows/trigger.ex
@@ -64,8 +64,7 @@ defmodule Lightning.Workflows.Trigger do
embeds_one :kafka_configuration, KafkaConfiguration, on_replace: :update
embeds_one :sync_webhook_response_config, SyncWebhookResponseConfig,
- on_replace: :update,
- primary_key: false
+ on_replace: :update
timestamps()
end
diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex
index a759fff7c18..63f28352f1b 100644
--- a/lib/lightning_web/channels/run_channel.ex
+++ b/lib/lightning_web/channels/run_channel.ex
@@ -360,7 +360,7 @@ defmodule LightningWeb.RunChannel do
when is_map(body),
do: body
- defp default_response_body(:success, final_state, %{body: nil}),
+ defp default_response_body(:success, final_state, _config),
do: final_state
defp default_response_body(run_status, _final_state, _config) do
diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex
index bb624b87244..ec0985812a3 100644
--- a/lib/lightning_web/controllers/webhooks_controller.ex
+++ b/lib/lightning_web/controllers/webhooks_controller.ex
@@ -65,11 +65,11 @@ defmodule LightningWeb.WebhooksController do
{:ok, work_order} ->
conn =
conn
- |> put_resp_header("x-meta-work_order_id", work_order.id)
+ |> put_resp_header("x-meta-work-order-id", work_order.id)
|> then(fn conn ->
case work_order do
%{runs: [run]} ->
- put_resp_header(conn, "x-meta-run_id", run.id)
+ put_resp_header(conn, "x-meta-run-id", run.id)
_ ->
conn
diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs
index be2544e0ca8..5c71dddfe16 100644
--- a/test/integration/web_and_worker_test.exs
+++ b/test/integration/web_and_worker_test.exs
@@ -12,6 +12,7 @@ defmodule Lightning.WebAndWorkerTest do
alias Lightning.Runtime.RuntimeManager
alias Lightning.WorkOrders
alias Lightning.Workflows.Snapshot
+ alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig
setup :set_mox_from_context
setup :verify_on_exit!
@@ -469,33 +470,8 @@ defmodule Lightning.WebAndWorkerTest do
# Wait for the workflow to complete (up to 10 seconds)
response = Task.await(task, 10_000)
- # Should return 201 with the final state
assert response.status == 201
-
- # The response body should be the final state from the job inside a "data"
- # key and the metadata inside a "meta" key.
- assert %{
- "data" => %{"data" => %{"value" => 10}, "result" => "success"},
- "meta" => meta
- } = response.body
-
- # Verify meta fields exist with correct types and values
- assert meta["error_type"] == nil
- assert meta["state"] == "success"
- assert is_binary(meta["run_id"])
- assert is_binary(meta["work_order_id"])
-
- # Verify datetime fields are present and valid ISO8601 strings
- assert is_binary(meta["claimed_at"])
- assert is_binary(meta["finished_at"])
- assert is_binary(meta["inserted_at"])
- assert is_binary(meta["started_at"])
-
- # Verify datetime fields match ISO8601 format
- assert meta["claimed_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/
- assert meta["finished_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/
- assert meta["inserted_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/
- assert meta["started_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/
+ assert %{"data" => %{"value" => 10}, "result" => "success"} = response.body
# Verify the work order completed successfully
work_order =
@@ -717,9 +693,7 @@ defmodule Lightning.WebAndWorkerTest do
assert response.status == 201
- assert %{"data" => final_state, "meta" => meta} = response.body
-
- assert meta["state"] == "success"
+ final_state = response.body
# The final_state is keyed by job ID. When the same job is reached
# twice (step 5), the second entry gets a "-1" suffix.
@@ -744,6 +718,181 @@ defmodule Lightning.WebAndWorkerTest do
assert Enum.count(steps) == 6
assert Enum.all?(steps, fn step -> step.exit_reason == "success" end)
end
+
+ @tag :integration
+ @tag timeout: 120_000
+ test "uses configured success_code when workflow succeeds", %{uri: uri} do
+ project = insert(:project)
+ trigger = build(:trigger, type: :webhook, enabled: true)
+
+ job =
+ build(:job,
+ adaptor: "@openfn/language-common@latest",
+ body: "fn(state => state);",
+ name: "job"
+ )
+
+ workflow =
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ [trigger] = workflow.triggers
+
+ trigger
+ |> Ecto.Changeset.change(webhook_reply: :after_completion)
+ |> Ecto.Changeset.put_embed(
+ :sync_webhook_response_config,
+ %SyncWebhookResponseConfig{success_code: 200}
+ )
+ |> Repo.update!()
+
+ Snapshot.create(workflow |> Repo.reload!())
+
+ response =
+ build_tesla_client(uri)
+ |> Tesla.post!("/i/#{trigger.id}", %{"value" => 1})
+
+ assert response.status == 200
+ end
+
+ @tag :integration
+ @tag timeout: 120_000
+ test "uses configured error_code when workflow fails", %{uri: uri} do
+ project = insert(:project)
+ trigger = build(:trigger, type: :webhook, enabled: true)
+
+ job =
+ build(:job,
+ adaptor: "@openfn/language-common@latest",
+ body: "fn(state => { throw new Error('forced failure'); });",
+ name: "job"
+ )
+
+ workflow =
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ [trigger] = workflow.triggers
+
+ trigger
+ |> Ecto.Changeset.change(webhook_reply: :after_completion)
+ |> Ecto.Changeset.put_embed(
+ :sync_webhook_response_config,
+ %SyncWebhookResponseConfig{error_code: 500}
+ )
+ |> Repo.update!()
+
+ Snapshot.create(workflow |> Repo.reload!())
+
+ response =
+ build_tesla_client(uri)
+ |> Tesla.post!("/i/#{trigger.id}", %{})
+
+ assert response.status == 500
+ assert %{"message" => msg} = response.body
+ assert msg =~ "security policy"
+ end
+
+ @tag :integration
+ @tag timeout: 120_000
+ test "uses configured body for both success and error runs", %{uri: uri} do
+ project = insert(:project)
+ custom_body = %{"ack" => true, "ref" => "custom-response"}
+
+ for {job_body, label} <- [
+ {"fn(state => state);", "success"},
+ {"fn(state => { throw new Error('fail'); });", "error"}
+ ] do
+ trigger = build(:trigger, type: :webhook, enabled: true)
+
+ job =
+ build(:job,
+ adaptor: "@openfn/language-common@latest",
+ body: job_body,
+ name: label
+ )
+
+ workflow =
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ [trigger] = workflow.triggers
+
+ trigger
+ |> Ecto.Changeset.change(webhook_reply: :after_completion)
+ |> Ecto.Changeset.put_embed(
+ :sync_webhook_response_config,
+ %SyncWebhookResponseConfig{body: custom_body}
+ )
+ |> Repo.update!()
+
+ Snapshot.create(workflow |> Repo.reload!())
+
+ response =
+ build_tesla_client(uri)
+ |> Tesla.post!("/i/#{trigger.id}", %{})
+
+ assert response.body == custom_body,
+ "expected custom body for #{label} run, got: #{inspect(response.body)}"
+ end
+ end
+
+ @tag :integration
+ @tag timeout: 120_000
+ test "honours _webhookResponse set in final state", %{uri: uri} do
+ project = insert(:project)
+ trigger = build(:trigger, type: :webhook, enabled: true)
+
+ job =
+ build(:job,
+ adaptor: "@openfn/language-common@latest",
+ body: """
+ fn(state => ({
+ ...state,
+ _webhookResponse: { status: 200, body: { ack: true, received: state.data.value } }
+ }));
+ """,
+ name: "responding-job"
+ )
+
+ workflow =
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ [trigger] = workflow.triggers
+
+ trigger
+ |> Ecto.Changeset.change(webhook_reply: :after_completion)
+ |> Repo.update!()
+
+ Snapshot.create(workflow |> Repo.reload!())
+
+ response =
+ build_tesla_client(uri)
+ |> Tesla.post!("/i/#{trigger.id}", %{"value" => 42})
+
+ assert response.status == 200
+ assert response.body == %{"ack" => true, "received" => 42}
+ end
+ end
+
+ defp build_tesla_client(uri) do
+ Tesla.client(
+ [{Tesla.Middleware.BaseUrl, uri}, Tesla.Middleware.JSON],
+ {Tesla.Adapter.Finch, name: Lightning.Finch}
+ )
end
defp webhook_expression do
diff --git a/test/lightning_web/channels/run_channel_test.exs b/test/lightning_web/channels/run_channel_test.exs
index d4e75643904..266f088fc2a 100644
--- a/test/lightning_web/channels/run_channel_test.exs
+++ b/test/lightning_web/channels/run_channel_test.exs
@@ -1766,6 +1766,362 @@ defmodule LightningWeb.RunChannelTest do
end
end
+ describe "webhook response broadcasting" do
+ setup [:create_user, :create_project]
+
+ setup %{project: project} do
+ dataclip = insert(:http_request_dataclip, project: project)
+
+ trigger =
+ build(:trigger,
+ type: :webhook,
+ enabled: true,
+ webhook_reply: :after_completion
+ )
+
+ job = build(:job)
+
+ %{triggers: [trigger]} =
+ workflow =
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ work_order =
+ insert(:workorder,
+ workflow: workflow,
+ trigger: trigger,
+ dataclip: dataclip
+ )
+
+ run =
+ insert(:run,
+ work_order: work_order,
+ starting_trigger: trigger,
+ dataclip: dataclip,
+ state: :started,
+ options:
+ Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{
+ project_id: project.id
+ })
+ |> Map.new()
+ )
+
+ %{run: run, work_order: work_order, trigger: trigger}
+ end
+
+ setup [:create_socket, :join_run_channel]
+
+ test "does not broadcast when webhook_reply is not :after_completion", %{
+ socket: socket,
+ work_order: work_order,
+ trigger: trigger
+ } do
+ trigger
+ |> Ecto.Changeset.change(webhook_reply: :before_start)
+ |> Repo.update!()
+
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{"data" => "ok"}
+ })
+
+ assert_reply ref, :ok, nil
+ refute_receive {:webhook_response, _, _}
+ end
+
+ test "broadcasts final state as body on success with no config", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ final_state = %{"data" => %{"result" => "ok"}}
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => final_state
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 201, ^final_state}
+ end
+
+ test "broadcasts security message on error with no config", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "fail",
+ "error_type" => "UserError",
+ "error_message" => nil
+ })
+
+ assert_reply ref, :ok, nil
+
+ assert_receive {:webhook_response, 201, %{message: message}}
+ assert message =~ "failed"
+ assert message =~ "security policy"
+ end
+
+ test "uses configured success_code on success", %{
+ socket: socket,
+ work_order: work_order,
+ trigger: trigger
+ } do
+ put_webhook_config(trigger, success_code: 200)
+
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ final_state = %{"data" => "ok"}
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => final_state
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 200, ^final_state}
+ end
+
+ test "uses configured error_code on error", %{
+ socket: socket,
+ work_order: work_order,
+ trigger: trigger
+ } do
+ put_webhook_config(trigger, error_code: 422)
+
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "fail",
+ "error_type" => "UserError",
+ "error_message" => nil
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 422, %{message: _}}
+ end
+
+ test "uses configured body on success, overriding final state", %{
+ socket: socket,
+ work_order: work_order,
+ trigger: trigger
+ } do
+ custom_body = %{"custom" => true}
+ put_webhook_config(trigger, body: custom_body)
+
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{"data" => "should be ignored"}
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 201, ^custom_body}
+ end
+
+ test "uses configured body on error, overriding security policy", %{
+ socket: socket,
+ work_order: work_order,
+ trigger: trigger
+ } do
+ custom_body = %{"error_detail" => "safe to expose"}
+ put_webhook_config(trigger, body: custom_body)
+
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "fail",
+ "error_type" => "UserError",
+ "error_message" => nil
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 201, ^custom_body}
+ end
+
+ test "valid _webhookResponse overrides config entirely", %{
+ socket: socket,
+ work_order: work_order,
+ trigger: trigger
+ } do
+ put_webhook_config(trigger, success_code: 999, body: %{"ignored" => true})
+
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ override_body = %{"custom" => "response"}
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{
+ "_webhookResponse" => %{"status" => 200, "body" => override_body}
+ }
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 200, ^override_body}
+ end
+
+ test "float status in _webhookResponse is normalised to integer", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ override_body = %{"ok" => true}
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{
+ "_webhookResponse" => %{"status" => 200.0, "body" => override_body}
+ }
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 200, ^override_body}
+ end
+
+ test "empty _webhookResponse map falls back to default behaviour", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ final_state = %{"_webhookResponse" => %{}, "data" => "ok"}
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => final_state
+ })
+
+ assert_reply ref, :ok, nil
+ assert_receive {:webhook_response, 201, ^final_state}
+ end
+
+ test "_webhookResponse without body key is malformed", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{"_webhookResponse" => %{"status" => 200}}
+ })
+
+ assert_reply ref, :ok, nil
+
+ assert_receive {:webhook_response, 201, %{message: message}}
+ assert message =~ "malformed"
+ end
+
+ test "_webhookResponse without status key is malformed", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{
+ "_webhookResponse" => %{"body" => %{"data" => "ok"}}
+ }
+ })
+
+ assert_reply ref, :ok, nil
+
+ assert_receive {:webhook_response, 201, %{message: message}}
+ assert message =~ "malformed"
+ end
+
+ test "non-map _webhookResponse value is malformed", %{
+ socket: socket,
+ work_order: work_order
+ } do
+ Phoenix.PubSub.subscribe(
+ Lightning.PubSub,
+ "work_order:#{work_order.id}:webhook_response"
+ )
+
+ ref =
+ push(socket, "run:complete", %{
+ "reason" => "success",
+ "final_state" => %{"_webhookResponse" => "not a map"}
+ })
+
+ assert_reply ref, :ok, nil
+
+ assert_receive {:webhook_response, 201, %{message: message}}
+ assert message =~ "malformed"
+ end
+ end
+
+ defp put_webhook_config(trigger, attrs) do
+ alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig
+ config = struct(SyncWebhookResponseConfig, attrs)
+
+ trigger
+ |> Ecto.Changeset.change()
+ |> Ecto.Changeset.put_embed(:sync_webhook_response_config, config)
+ |> Repo.update!()
+ end
+
defp create_socket_and_run(context) do
merge_setups(context, [
:create_project,
diff --git a/test/lightning_web/controllers/webhooks_controller_test.exs b/test/lightning_web/controllers/webhooks_controller_test.exs
index f9731191909..4d3aa91adae 100644
--- a/test/lightning_web/controllers/webhooks_controller_test.exs
+++ b/test/lightning_web/controllers/webhooks_controller_test.exs
@@ -402,89 +402,60 @@ defmodule LightningWeb.WebhooksControllerTest do
describe "delayed webhook response (webhook_reply: :after_completion)" do
setup [:stub_rate_limiter_ok, :stub_usage_limiter_ok]
- test "waits for and returns the final state on success", %{conn: conn} do
- %{triggers: [trigger]} =
+ test "waits for and returns the broadcast response body and status code", %{
+ conn: conn
+ } do
+ %{triggers: [trigger], project_id: project_id} =
insert(:simple_workflow)
|> Lightning.Repo.preload(:triggers)
|> with_snapshot()
- # Update trigger to use after_completion
trigger =
trigger
|> Ecto.Changeset.change(webhook_reply: :after_completion)
|> Repo.update!()
- message = %{"foo" => "bar"}
+ Lightning.WorkOrders.Events.subscribe(project_id)
- # Spawn a task that will post to the webhook
- # This task will be blocked waiting for the webhook response
test_pid = self()
task =
Task.async(fn ->
- conn = post(conn, "/i/#{trigger.id}", message)
+ conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"})
send(test_pid, {:response, conn})
end)
- # Wait for the work order to be created
- Process.sleep(100)
-
- # Find the work order
- work_order =
- Lightning.Repo.one(
- from wo in Lightning.WorkOrder,
- where: wo.trigger_id == ^trigger.id,
- order_by: [desc: wo.inserted_at],
- limit: 1
- )
-
- assert work_order
-
- # Get the run for metadata
- run =
- Lightning.Repo.one(
- from r in Lightning.Run,
- where: r.work_order_id == ^work_order.id,
- limit: 1
- )
-
- # Simulate the worker completing the run with final state
- final_state_data = %{"result" => "success", "data" => %{"x" => 42}}
-
- response_body = %{
- data: final_state_data,
- meta: %{
- work_order_id: work_order.id,
- run_id: run.id,
- state: :success,
- error_type: nil,
- inserted_at: run.inserted_at,
- started_at: run.started_at,
- claimed_at: run.claimed_at,
- finished_at: run.finished_at
- }
+ assert_receive %Lightning.WorkOrders.Events.WorkOrderCreated{
+ work_order: work_order
}
+ assert_receive %Lightning.WorkOrders.Events.RunCreated{run: run}
+
+ body = %{"result" => "success", "value" => 42}
+
Phoenix.PubSub.broadcast(
Lightning.PubSub,
"work_order:#{work_order.id}:webhook_response",
- {:webhook_response, 200, response_body}
+ {:webhook_response, 200, body}
)
- # Now the task should complete with the response
assert_receive {:response, response_conn}, 5_000
- response = json_response(response_conn, 200)
- assert response["data"] == final_state_data
- assert response["meta"]["work_order_id"] == work_order.id
- assert response["meta"]["run_id"] == run.id
- assert response["meta"]["state"] == "success"
+ assert json_response(response_conn, 200) == body
+
+ assert get_resp_header(response_conn, "x-meta-work-order-id") == [
+ work_order.id
+ ]
+
+ assert get_resp_header(response_conn, "x-meta-run-id") == [run.id]
Task.await(task)
end
- test "waits for and returns error state on failure", %{conn: conn} do
- %{triggers: [trigger]} =
+ test "passes through any status code from the broadcast message", %{
+ conn: conn
+ } do
+ %{triggers: [trigger], project_id: project_id} =
insert(:simple_workflow)
|> Lightning.Repo.preload(:triggers)
|> with_snapshot()
@@ -494,65 +465,39 @@ defmodule LightningWeb.WebhooksControllerTest do
|> Ecto.Changeset.change(webhook_reply: :after_completion)
|> Repo.update!()
- message = %{"foo" => "bar"}
+ Lightning.WorkOrders.Events.subscribe(project_id)
+
test_pid = self()
task =
Task.async(fn ->
- conn = post(conn, "/i/#{trigger.id}", message)
+ conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"})
send(test_pid, {:response, conn})
end)
- Process.sleep(100)
-
- work_order =
- Lightning.Repo.one(
- from wo in Lightning.WorkOrder,
- where: wo.trigger_id == ^trigger.id,
- order_by: [desc: wo.inserted_at],
- limit: 1
- )
-
- assert work_order
-
- # Get the run for metadata
- run =
- Lightning.Repo.one(
- from r in Lightning.Run,
- where: r.work_order_id == ^work_order.id,
- limit: 1
- )
-
- error_data = %{"error" => "Something went wrong"}
-
- response_body = %{
- data: error_data,
- meta: %{
- work_order_id: work_order.id,
- run_id: run.id,
- state: :failed,
- error_type: "RuntimeError",
- inserted_at: run.inserted_at,
- started_at: run.started_at,
- claimed_at: run.claimed_at,
- finished_at: run.finished_at
- }
+ assert_receive %Lightning.WorkOrders.Events.WorkOrderCreated{
+ work_order: work_order
}
+ assert_receive %Lightning.WorkOrders.Events.RunCreated{run: run}
+
+ body = %{"message" => "run failed"}
+
Phoenix.PubSub.broadcast(
Lightning.PubSub,
"work_order:#{work_order.id}:webhook_response",
- {:webhook_response, 422, response_body}
+ {:webhook_response, 422, body}
)
assert_receive {:response, response_conn}, 5_000
- response = json_response(response_conn, 422)
- assert response["data"] == error_data
- assert response["meta"]["work_order_id"] == work_order.id
- assert response["meta"]["run_id"] == run.id
- assert response["meta"]["state"] == "failed"
- assert response["meta"]["error_type"] == "RuntimeError"
+ assert json_response(response_conn, 422) == body
+
+ assert get_resp_header(response_conn, "x-meta-work-order-id") == [
+ work_order.id
+ ]
+
+ assert get_resp_header(response_conn, "x-meta-run-id") == [run.id]
Task.await(task)
end
@@ -560,7 +505,6 @@ defmodule LightningWeb.WebhooksControllerTest do
test "returns timeout if workflow doesn't complete within timeout period", %{
conn: conn
} do
- # Set a shorter timeout for this test (2 seconds instead of default)
expect(Lightning.MockConfig, :webhook_response_timeout_ms, fn -> 2_000 end)
%{triggers: [trigger]} =
@@ -573,19 +517,16 @@ defmodule LightningWeb.WebhooksControllerTest do
|> Ecto.Changeset.change(webhook_reply: :after_completion)
|> Repo.update!()
- message = %{"foo" => "bar"}
+ conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"})
- # This will timeout since we never broadcast a response
- conn = post(conn, "/i/#{trigger.id}", message)
+ [work_order_id] = get_resp_header(conn, "x-meta-work-order-id")
assert json_response(conn, 504) == %{
"error" => "timeout",
"message" => "Workflow did not complete within timeout period",
- "work_order_id" => json_response(conn, 504)["work_order_id"]
+ "work_order_id" => work_order_id
}
- # Verify work order was still created
- work_order_id = json_response(conn, 504)["work_order_id"]
assert WorkOrders.get(work_order_id)
end
@@ -597,15 +538,12 @@ defmodule LightningWeb.WebhooksControllerTest do
|> Lightning.Repo.preload(:triggers)
|> with_snapshot()
- # Ensure trigger is using default before_start
assert trigger.webhook_reply == :before_start
- message = %{"foo" => "bar"}
-
- # Should return immediately
- conn = post(conn, "/i/#{trigger.id}", message)
+ conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"})
- assert %{"work_order_id" => work_order_id} = json_response(conn, 200)
+ [work_order_id] = get_resp_header(conn, "x-meta-work-order-id")
+ assert %{"work_order_id" => ^work_order_id} = json_response(conn, 200)
assert WorkOrders.get(work_order_id)
end
end
From e1702504c359c937a46cb2f49d29010ddcf3f293 Mon Sep 17 00:00:00 2001
From: Frank Midigo
Date: Mon, 27 Apr 2026 11:57:18 +0300
Subject: [PATCH 07/21] handle webhook_response in YAML export, provisioning
API, and import
- Serialize sync_webhook_response_config as webhook_response in YAML
export; body is JSON-encoded as a literal block scalar matching how
job.body is handled
- Expose webhook_response in the provisioning API GET response
- Accept webhook_response on provisioning import and remap it to
sync_webhook_response_config; decode body if it arrives as a JSON
string (from YAML literal block)
- Add Jason.Encoder derives so sync_webhook_response_config is included
when the trigger is serialized for the session context
- Add tests covering export, API output, and import including the
JSON-string body decoding path
---
lib/lightning/export_utils.ex | 38 ++++++--
lib/lightning/projects/provisioner.ex | 29 +++++++
lib/lightning/workflows/trigger.ex | 3 +-
.../triggers/sync_webhook_response_config.ex | 2 +
.../controllers/api/provisioning_json.ex | 9 ++
test/lightning/projects/provisioner_test.exs | 87 +++++++++++++++++++
test/lightning/projects_test.exs | 55 ++++++++++++
.../api/provisioning_controller_test.exs | 75 ++++++++++++++++
test/support/factories.ex | 4 +
9 files changed, 296 insertions(+), 6 deletions(-)
diff --git a/lib/lightning/export_utils.ex b/lib/lightning/export_utils.ex
index b274485bbf7..7f83c28b1f8 100644
--- a/lib/lightning/export_utils.ex
+++ b/lib/lightning/export_utils.ex
@@ -31,6 +31,7 @@ defmodule Lightning.ExportUtils do
trigger: [
:type,
:webhook_reply,
+ :webhook_response,
:cron_expression,
:cron_cursor_job,
:enabled,
@@ -121,14 +122,38 @@ defmodule Lightning.ExportUtils do
Map.put(base, :kafka_configuration, kafka_config)
:webhook ->
- if trigger.webhook_reply do
- Map.put(base, :webhook_reply, Atom.to_string(trigger.webhook_reply))
- else
- base
- end
+ base
+ |> maybe_put_webhook_reply(trigger.webhook_reply)
+ |> maybe_put_webhook_response(trigger.sync_webhook_response_config)
+ end
+ end
+
+ defp maybe_put_webhook_reply(map, reply) when is_atom(reply) do
+ Map.put(map, :webhook_reply, Atom.to_string(reply))
+ end
+
+ defp maybe_put_webhook_reply(map, _), do: map
+
+ defp maybe_put_webhook_response(map, %{} = config) do
+ webhook_response =
+ Map.reject(
+ %{
+ success_code: config.success_code,
+ error_code: config.error_code,
+ body: config.body && Jason.encode!(config.body, pretty: true)
+ },
+ fn {_k, v} -> is_nil(v) end
+ )
+
+ if map_size(webhook_response) > 0 do
+ Map.put(map, :webhook_response, webhook_response)
+ else
+ map
end
end
+ defp maybe_put_webhook_response(map, _), do: map
+
defp edge_to_treenode(%{source_job_id: nil} = edge, triggers, jobs) do
source_trigger =
Enum.find(triggers, fn t -> t.id == edge.source_trigger_id end)
@@ -205,6 +230,9 @@ defmodule Lightning.ExportUtils do
:body ->
"body: |\n#{indent_multiline_value(v, i)}"
+ :webhook_response ->
+ "webhook_response: |\n#{indent_multiline_value(v, i)}"
+
:description ->
"description: |\n#{indent_multiline_value(v, i)}"
diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex
index 9d10373154b..5016fafe54f 100644
--- a/lib/lightning/projects/provisioner.ex
+++ b/lib/lightning/projects/provisioner.ex
@@ -32,6 +32,7 @@ defmodule Lightning.Projects.Provisioner do
alias Lightning.Workflows.Snapshot
alias Lightning.Workflows.Trigger
alias Lightning.Workflows.Triggers.KafkaConfiguration
+ alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig
alias Lightning.Workflows.Workflow
alias Lightning.Workflows.WorkflowUsageLimiter
alias Lightning.WorkflowVersions
@@ -455,6 +456,8 @@ defmodule Lightning.Projects.Provisioner do
end
defp trigger_changeset(trigger, attrs) do
+ attrs = remap_webhook_response(attrs)
+
trigger
|> Trigger.cast_changeset(attrs)
|> cast_embed(
@@ -462,6 +465,11 @@ defmodule Lightning.Projects.Provisioner do
required: false,
with: &kafka_config_changeset/2
)
+ |> cast_embed(
+ :sync_webhook_response_config,
+ required: false,
+ with: &SyncWebhookResponseConfig.changeset/2
+ )
|> Trigger.validate()
|> cast(attrs, [:delete])
|> validate_required([:id])
@@ -470,6 +478,27 @@ defmodule Lightning.Projects.Provisioner do
|> maybe_mark_for_deletion()
end
+ defp remap_webhook_response(attrs) do
+ case Map.pop(attrs, "webhook_response") do
+ {nil, attrs} ->
+ attrs
+
+ {config, attrs} ->
+ config = decode_webhook_response_body(config)
+ Map.put(attrs, "sync_webhook_response_config", config)
+ end
+ end
+
+ defp decode_webhook_response_body(%{"body" => body} = config)
+ when is_binary(body) do
+ case Jason.decode(body) do
+ {:ok, decoded} -> Map.put(config, "body", decoded)
+ _ -> config
+ end
+ end
+
+ defp decode_webhook_response_body(config), do: config
+
defp kafka_config_changeset(kafka_config, attrs) do
kafka_config
|> KafkaConfiguration.changeset(attrs)
diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex
index 35662429cf4..ca050bb2f96 100644
--- a/lib/lightning/workflows/trigger.ex
+++ b/lib/lightning/workflows/trigger.ex
@@ -37,7 +37,8 @@ defmodule Lightning.Workflows.Trigger do
:cron_cursor_job_id,
:type,
:enabled,
- :webhook_reply
+ :webhook_reply,
+ :sync_webhook_response_config
]}
schema "triggers" do
field :comment, :string
diff --git a/lib/lightning/workflows/triggers/sync_webhook_response_config.ex b/lib/lightning/workflows/triggers/sync_webhook_response_config.ex
index 1baacb8659a..d77633b0db7 100644
--- a/lib/lightning/workflows/triggers/sync_webhook_response_config.ex
+++ b/lib/lightning/workflows/triggers/sync_webhook_response_config.ex
@@ -15,6 +15,8 @@ defmodule Lightning.Workflows.Triggers.SyncWebhookResponseConfig do
use Ecto.Schema
import Ecto.Changeset
+ @derive {Jason.Encoder, only: [:success_code, :error_code, :body]}
+
@primary_key false
embedded_schema do
field :success_code, :integer
diff --git a/lib/lightning_web/controllers/api/provisioning_json.ex b/lib/lightning_web/controllers/api/provisioning_json.ex
index 3db39601d1c..4c01cb62865 100644
--- a/lib/lightning_web/controllers/api/provisioning_json.ex
+++ b/lib/lightning_web/controllers/api/provisioning_json.ex
@@ -102,11 +102,20 @@ defmodule LightningWeb.API.ProvisioningJSON do
~w(hosts topics initial_offset_reset_policy connect_timeout)a
)
+ webhook_response =
+ trigger.sync_webhook_response_config &&
+ Map.take(trigger.sync_webhook_response_config, [
+ :success_code,
+ :error_code,
+ :body
+ ])
+
trigger
|> Map.take(
~w(id type cron_expression enabled webhook_reply cron_cursor_job_id)a
)
|> Map.put(:kafka_configuration, kafka_configuration)
+ |> Map.put(:webhook_response, webhook_response)
|> drop_keys_with_nil_value()
end
diff --git a/test/lightning/projects/provisioner_test.exs b/test/lightning/projects/provisioner_test.exs
index 34fa8202824..9f63f4e5336 100644
--- a/test/lightning/projects/provisioner_test.exs
+++ b/test/lightning/projects/provisioner_test.exs
@@ -361,6 +361,93 @@ defmodule Lightning.Projects.ProvisionerTest do
assert trigger.webhook_reply == :after_completion
end
+ test "imports trigger with webhook_response field" do
+ Mox.verify_on_exit!()
+ user = insert(:user)
+
+ %{body: %{"workflows" => [workflow]} = body, project_id: project_id} =
+ valid_document()
+
+ updated_triggers =
+ Enum.map(workflow["triggers"], fn trigger ->
+ Map.merge(trigger, %{
+ "type" => "webhook",
+ "webhook_reply" => "after_completion",
+ "webhook_response" => %{
+ "success_code" => 200,
+ "error_code" => 500,
+ "body" => %{"status" => "ok"}
+ }
+ })
+ end)
+
+ body =
+ Map.put(body, "workflows", [
+ Map.put(workflow, "triggers", updated_triggers)
+ ])
+
+ Mox.stub(
+ Lightning.Extensions.MockUsageLimiter,
+ :limit_action,
+ fn _action, _context -> :ok end
+ )
+
+ {:ok, project} =
+ Provisioner.import_document(
+ %Lightning.Projects.Project{},
+ user,
+ body
+ )
+
+ assert %{id: ^project_id, workflows: [%{triggers: [trigger]}]} = project
+ assert trigger.webhook_reply == :after_completion
+ assert trigger.sync_webhook_response_config.success_code == 200
+ assert trigger.sync_webhook_response_config.error_code == 500
+ assert trigger.sync_webhook_response_config.body == %{"status" => "ok"}
+ end
+
+ test "imports trigger with webhook_response body as a JSON string (from YAML)" do
+ Mox.verify_on_exit!()
+ user = insert(:user)
+
+ %{body: %{"workflows" => [workflow]} = body, project_id: project_id} =
+ valid_document()
+
+ updated_triggers =
+ Enum.map(workflow["triggers"], fn trigger ->
+ Map.merge(trigger, %{
+ "type" => "webhook",
+ "webhook_reply" => "after_completion",
+ "webhook_response" => %{
+ "success_code" => 200,
+ "body" => ~s({"status": "ok"})
+ }
+ })
+ end)
+
+ body =
+ Map.put(body, "workflows", [
+ Map.put(workflow, "triggers", updated_triggers)
+ ])
+
+ Mox.stub(
+ Lightning.Extensions.MockUsageLimiter,
+ :limit_action,
+ fn _action, _context -> :ok end
+ )
+
+ {:ok, project} =
+ Provisioner.import_document(
+ %Lightning.Projects.Project{},
+ user,
+ body
+ )
+
+ assert %{id: ^project_id, workflows: [%{triggers: [trigger]}]} = project
+ assert trigger.sync_webhook_response_config.success_code == 200
+ assert trigger.sync_webhook_response_config.body == %{"status" => "ok"}
+ end
+
test "imports cron trigger with cron_cursor_job_id field" do
Mox.verify_on_exit!()
user = insert(:user)
diff --git a/test/lightning/projects_test.exs b/test/lightning/projects_test.exs
index 19577d1e40e..36bbcff2f2b 100644
--- a/test/lightning/projects_test.exs
+++ b/test/lightning/projects_test.exs
@@ -780,6 +780,61 @@ defmodule Lightning.ProjectsTest do
assert generated_yaml =~ expected_yaml_trigger
end
+ test "webhook_response config is included in the export" do
+ project = insert(:project, name: "project 1")
+
+ trigger =
+ build(:trigger,
+ type: :webhook,
+ enabled: true,
+ webhook_reply: :after_completion,
+ sync_webhook_response_config:
+ build(:sync_webhook_response_config,
+ success_code: 200,
+ error_code: 500,
+ body: %{"status" => "ok"}
+ )
+ )
+
+ job =
+ build(:job,
+ body: ~s[fn(state => state)]
+ )
+
+ build(:workflow, name: "workflow 1", project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ assert {:ok, generated_yaml} = Projects.export_project(:yaml, project.id)
+
+ assert generated_yaml =~ "webhook_reply: after_completion"
+
+ assert generated_yaml =~ "webhook_response:"
+ assert generated_yaml =~ "success_code: 200"
+ assert generated_yaml =~ "error_code: 500"
+ assert generated_yaml =~ "body:"
+ end
+
+ test "webhook_response is omitted when sync_webhook_response_config is nil" do
+ project = insert(:project, name: "project 2")
+
+ trigger = build(:trigger, type: :webhook, enabled: true)
+
+ job = build(:job, body: ~s[fn(state => state)])
+
+ build(:workflow, name: "workflow 1", project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ assert {:ok, generated_yaml} = Projects.export_project(:yaml, project.id)
+
+ refute generated_yaml =~ "webhook_response"
+ end
+
test "exports canonical project" do
project =
canonical_project_fixture(
diff --git a/test/lightning_web/controllers/api/provisioning_controller_test.exs b/test/lightning_web/controllers/api/provisioning_controller_test.exs
index 47c8bbee57b..ef26c506d75 100644
--- a/test/lightning_web/controllers/api/provisioning_controller_test.exs
+++ b/test/lightning_web/controllers/api/provisioning_controller_test.exs
@@ -594,6 +594,81 @@ defmodule LightningWeb.API.ProvisioningControllerTest do
} = trigger_json
end
+ test "returns webhook_response when sync_webhook_response_config is set", %{
+ conn: conn,
+ user: user
+ } do
+ project = insert(:project, project_users: [%{user_id: user.id}])
+
+ trigger =
+ build(:trigger,
+ type: :webhook,
+ webhook_reply: :after_completion,
+ sync_webhook_response_config:
+ build(:sync_webhook_response_config,
+ success_code: 200,
+ error_code: 500,
+ body: %{"status" => "ok"}
+ )
+ )
+
+ job = build(:job)
+
+ %{triggers: [%{id: trigger_id}]} =
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ conn = get(conn, ~p"/api/provision/#{project.id}")
+ response = json_response(conn, 200)
+
+ assert %{
+ "workflows" => [
+ %{
+ "triggers" => [trigger_json]
+ }
+ ]
+ } = response["data"]
+
+ assert %{
+ "id" => ^trigger_id,
+ "type" => "webhook",
+ "webhook_reply" => "after_completion",
+ "webhook_response" => %{
+ "success_code" => 200,
+ "error_code" => 500,
+ "body" => %{"status" => "ok"}
+ }
+ } = trigger_json
+ end
+
+ test "omits webhook_response when sync_webhook_response_config is nil", %{
+ conn: conn,
+ user: user
+ } do
+ project = insert(:project, project_users: [%{user_id: user.id}])
+
+ trigger = build(:trigger, type: :webhook)
+ job = build(:job)
+
+ build(:workflow, project: project)
+ |> with_trigger(trigger)
+ |> with_job(job)
+ |> with_edge({trigger, job}, condition_type: :always)
+ |> insert()
+
+ conn = get(conn, ~p"/api/provision/#{project.id}")
+ response = json_response(conn, 200)
+
+ assert %{
+ "workflows" => [%{"triggers" => [trigger_json]}]
+ } = response["data"]
+
+ refute Map.has_key?(trigger_json, "webhook_response")
+ end
+
test "returns a cron trigger with cron_cursor_job_id in the response", %{
conn: conn,
user: user
diff --git a/test/support/factories.ex b/test/support/factories.ex
index ffa99da33d0..68e7134deab 100644
--- a/test/support/factories.ex
+++ b/test/support/factories.ex
@@ -345,6 +345,10 @@ defmodule Lightning.Factories do
}
end
+ def sync_webhook_response_config_factory do
+ %Lightning.Workflows.Triggers.SyncWebhookResponseConfig{}
+ end
+
def triggers_kafka_configuration_factory do
%Lightning.Workflows.Triggers.KafkaConfiguration{
group_id: "arb_group_id",
From 9d82ae922e80bc6224ff394e9bafa20d168ce5f8 Mon Sep 17 00:00:00 2001
From: Frank Midigo
Date: Mon, 27 Apr 2026 11:57:29 +0300
Subject: [PATCH 08/21] handle webhook_response in YAML code view and unsaved
changes detection
- Export sync_webhook_response_config as webhook_response in YAML code
view, omitting null fields
- Import webhook_response back to sync_webhook_response_config when
applying a YAML spec; JSON-parse body if it arrives as a string from
the literal block
- Include sync_webhook_response_config in transformTrigger so status
code edits trigger the unsaved changes indicator
- Add WebhookResponseConfig type and webhook_response field to
SpecWebhookTrigger; add webhook_response schema to workflow-spec.json
---
.../hooks/useUnsavedChanges.ts | 2 ++
.../js/collaborative-editor/types/trigger.ts | 2 ++
assets/js/yaml/schema/workflow-spec.json | 9 ++++++
assets/js/yaml/types.ts | 7 +++++
assets/js/yaml/util.ts | 29 +++++++++++++++++++
5 files changed, 49 insertions(+)
diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts
index bffa8d11577..0c4e142e697 100644
--- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts
+++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts
@@ -106,6 +106,8 @@ function transformTrigger(trigger: Trigger) {
break;
case 'webhook':
output.webhook_reply = trigger.webhook_reply ?? 'before_start';
+ output.sync_webhook_response_config =
+ trigger.sync_webhook_response_config ?? null;
break;
}
return output;
diff --git a/assets/js/collaborative-editor/types/trigger.ts b/assets/js/collaborative-editor/types/trigger.ts
index 253d9f705c0..53170d96c22 100644
--- a/assets/js/collaborative-editor/types/trigger.ts
+++ b/assets/js/collaborative-editor/types/trigger.ts
@@ -57,6 +57,7 @@ const cronTriggerSchema = baseTriggerSchema.extend({
),
cron_cursor_job_id: z.string().uuid().nullable().default(null),
kafka_configuration: z.null().default(null),
+ sync_webhook_response_config: z.null().default(null),
webhook_reply: z.null().default(null).catch(null),
});
@@ -111,6 +112,7 @@ const kafkaTriggerSchema = baseTriggerSchema.extend({
cron_expression: z.null().default(null),
cron_cursor_job_id: z.null().default(null),
kafka_configuration: kafkaConfigSchema,
+ sync_webhook_response_config: z.null().default(null),
webhook_reply: z.null().default(null).catch(null),
});
diff --git a/assets/js/yaml/schema/workflow-spec.json b/assets/js/yaml/schema/workflow-spec.json
index f6385428593..19a2d82f35a 100644
--- a/assets/js/yaml/schema/workflow-spec.json
+++ b/assets/js/yaml/schema/workflow-spec.json
@@ -62,6 +62,15 @@
"type": ["string", "null"],
"enum": ["before_start", "after_completion", "custom", null]
},
+ "webhook_response": {
+ "type": ["object", "null"],
+ "properties": {
+ "success_code": { "type": ["integer", "null"] },
+ "error_code": { "type": ["integer", "null"] },
+ "body": { "type": ["object", "null"] }
+ },
+ "additionalProperties": false
+ },
"pos": {
"type": "object",
"properties": {
diff --git a/assets/js/yaml/types.ts b/assets/js/yaml/types.ts
index 4804aaced89..1cbb163d6c1 100644
--- a/assets/js/yaml/types.ts
+++ b/assets/js/yaml/types.ts
@@ -92,11 +92,18 @@ export type SpecCronTrigger = {
pos: Position | undefined;
};
+export type WebhookResponseConfig = {
+ success_code?: number | null;
+ error_code?: number | null;
+ body?: Record | null;
+};
+
export type SpecWebhookTrigger = {
id?: string;
type: 'webhook';
enabled: boolean;
webhook_reply: string | null;
+ webhook_response?: WebhookResponseConfig | null;
pos: Position | undefined;
};
diff --git a/assets/js/yaml/util.ts b/assets/js/yaml/util.ts
index 381b3ec4b1e..3fabc090ab3 100644
--- a/assets/js/yaml/util.ts
+++ b/assets/js/yaml/util.ts
@@ -82,6 +82,21 @@ export const convertWorkflowStateToSpec = (
if (trigger.type === 'webhook') {
triggerDetails.webhook_reply = trigger.webhook_reply ?? null;
+ const config = trigger.sync_webhook_response_config;
+ if (
+ config &&
+ (config.success_code != null ||
+ config.error_code != null ||
+ config.body != null)
+ ) {
+ triggerDetails.webhook_response = {
+ ...(config.success_code != null && {
+ success_code: config.success_code,
+ }),
+ ...(config.error_code != null && { error_code: config.error_code }),
+ ...(config.body != null && { body: config.body }),
+ };
+ }
}
// TODO: handle kafka config
@@ -186,6 +201,9 @@ export const convertWorkflowSpecToState = (
type: 'webhook',
enabled,
webhook_reply: specTrigger.webhook_reply,
+ sync_webhook_response_config: parseWebhookResponseConfig(
+ specTrigger.webhook_response
+ ),
};
} else {
trigger = {
@@ -251,6 +269,17 @@ export const convertWorkflowSpecToState = (
return workflowState;
};
+function parseWebhookResponseConfig(
+ config: import('./types').WebhookResponseConfig | null | undefined
+): import('./types').WebhookResponseConfig | null {
+ if (!config) return null;
+ const body =
+ typeof config.body === 'string'
+ ? (JSON.parse(config.body) as Record)
+ : config.body;
+ return { ...config, body };
+}
+
export const extractJobCredentials = (jobs: Workflow.Job[]): JobCredentials => {
const credentials: JobCredentials = {};
for (const job of jobs) {
From dd55b653baa499b768e8f61b11edc546296d701e Mon Sep 17 00:00:00 2001
From: Frank Midigo
Date: Mon, 27 Apr 2026 12:17:05 +0300
Subject: [PATCH 09/21] Give an example on how _webhookResponse key will look
like
---
.../components/inspector/TriggerForm.tsx | 55 ++++++++++++++++---
1 file changed, 46 insertions(+), 9 deletions(-)
diff --git a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx
index 585b8ffde7e..6c8a12e4f4d 100644
--- a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx
+++ b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx
@@ -35,6 +35,21 @@ interface TriggerFormProps {
const logger = _logger.ns('TriggerForm').seal();
+function hasResponseConfig(
+ config: {
+ success_code: number | null;
+ error_code: number | null;
+ body: Record | null;
+ } | null
+): boolean {
+ if (!config) return false;
+ return (
+ config.success_code != null ||
+ config.error_code != null ||
+ config.body != null
+ );
+}
+
/**
* Pure form component for trigger configuration.
* Handles trigger type, enabled toggle, and type-specific fields
@@ -429,6 +444,15 @@ export function TriggerForm({ trigger }: TriggerFormProps) {
? 'Holds the HTTP connection open and responds when the run completes.'
: 'Responds immediately with the enqueued work order ID.'}
+ {field.state.value === 'after_completion' &&
+ hasResponseConfig(
+ trigger.sync_webhook_response_config
+ ) && (
+
+ Switching to async will clear your response
+ configuration.
+
+ )}
{field.state.meta.errors.map(error => (
{/* Runtime override note */}
-
- To override both the status code and body
- at runtime, write to{' '}
-
- _webhookResponse
- {' '}
- in the job state. When present, it takes
- full priority over these defaults.
-
+
+
+ To override the response at runtime, set{' '}
+
+ _webhookResponse
+ {' '}
+ in the final job state:
+
- {bodyType === 'final_state'
- ? 'The final job state is returned on success. No body is sent on error to avoid exposing error details.'
- : 'This JSON is returned for both success and error runs.'}
-