diff --git a/CHANGELOG.md b/CHANGELOG.md index d6864f5be56..53bf3b3f8fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,11 +17,16 @@ and this project adheres to ### Added +- Allow users to respond back with custom webhook responses via the + `webhookResponse` field in the job state. + [#3102](https://github.com/OpenFn/lightning/issues/3102) + ### Changed ### Fixed ## [2.16.3] - 2026-05-07 + ## [2.16.3-pre3] - 2026-05-07 ### Fixed diff --git a/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts b/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts index 1ea725db71d..c62ab4fcf86 100644 --- a/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts +++ b/assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts @@ -77,6 +77,10 @@ export class YAMLStateToYDoc { if (trigger.type === 'webhook') { triggerMap.set('webhook_reply', trigger.webhook_reply ?? null); + triggerMap.set( + 'sync_webhook_response_config', + trigger.sync_webhook_response_config ?? null + ); } return triggerMap; diff --git a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx index e95766fe5eb..0a6fed60867 100644 --- a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx +++ b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx @@ -33,6 +33,16 @@ interface TriggerFormProps { const logger = _logger.ns('TriggerForm').seal(); +function hasResponseConfig( + config: { + success_code: number | null; + error_code: number | null; + } | null +): boolean { + if (!config) return false; + return config.success_code != null || config.error_code != null; +} + /** * Pure form component for trigger configuration. * Handles trigger type, enabled toggle, and type-specific fields @@ -45,6 +55,17 @@ export function TriggerForm({ trigger }: TriggerFormProps) { const { copyText, copyToClipboard } = useCopyToClipboard(); const [showAdvancedSettings, setShowAdvancedSettings] = useState(false); const [showAuthModal, setShowAuthModal] = useState(false); + const [hasResponseStatus, setHasResponseStatus] = useState(() => { + const cfg = trigger.sync_webhook_response_config; + return cfg != null && (cfg.success_code != null || cfg.error_code != null); + }); + const [showResponseStatus, setShowResponseStatus] = useState(true); + const [shownSuccessCode, setShownSuccessCode] = useState( + () => trigger.sync_webhook_response_config?.success_code != null + ); + const [shownErrorCode, setShownErrorCode] = useState( + () => trigger.sync_webhook_response_config?.error_code != null + ); const sessionContext = useSessionContext(); const { provider } = useSession(); const channel = provider?.channel; @@ -59,6 +80,19 @@ export function TriggerForm({ trigger }: TriggerFormProps) { state => state.activeTriggerAuthMethods ); + useEffect(() => { + const cfg = trigger.sync_webhook_response_config; + if (cfg == null) { + setHasResponseStatus(false); + setShownSuccessCode(false); + setShownErrorCode(false); + } else { + setHasResponseStatus(true); + setShownSuccessCode(cfg.success_code != null); + setShownErrorCode(cfg.error_code != null); + } + }, [trigger.sync_webhook_response_config]); + // Request auth methods when trigger is selected useEffect(() => { if (trigger.id) { @@ -415,18 +449,29 @@ export function TriggerForm({ trigger }: TriggerFormProps) { )} > + -

{field.state.value === 'after_completion' - ? 'Responds with the final output state after the run completes. (Note that depending on your queue size and the duration of the workflow itself, this could take a long time.)' + ? 'Holds the HTTP connection open and responds when the run completes.' : 'Responds immediately with the enqueued work order ID.'}

+ {field.state.value === 'after_completion' && + hasResponseConfig( + trigger.sync_webhook_response_config + ) && ( +

+ Switching to async will clear your response + configuration. +

+ )} {field.state.meta.errors.map(error => (

)} + + {/* Conditional config fields for after_completion */} + + {replyField => { + if (replyField.state.value !== 'after_completion') { + return null; + } + + return ( +

+ {/* Options header */} +
+ + Options + +
+ + {/* Response Status block */} + {hasResponseStatus && ( + + {field => { + const config = field.state.value as { + success_code: number | null; + error_code: number | null; + } | null; + + const baseConfig = config ?? { + success_code: null, + error_code: null, + }; + + const inputClass = cn( + 'block w-full px-3 py-2', + 'border rounded-md text-sm', + 'border-slate-300', + 'focus:border-indigo-500 focus:ring-indigo-500', + 'focus:outline-none focus:ring-1', + 'disabled:opacity-50 disabled:cursor-not-allowed' + ); + + return ( +
+ {/* Response Status header row */} +
+ + {!isReadOnly && ( + + )} +
+ + {/* Response Status content */} + {showResponseStatus && ( +
+ {/* Success Status Code row */} + {shownSuccessCode && ( +
+
+ + { + const raw = + e.target.value; + field.handleChange({ + ...baseConfig, + success_code: + raw === '' + ? null + : parseInt(raw, 10), + }); + }} + onBlur={field.handleBlur} + disabled={isReadOnly} + className={inputClass} + /> +
+ {!isReadOnly && ( + + )} +
+ )} + + {/* Error Status Code row */} + {shownErrorCode && ( +
+
+ + { + const raw = + e.target.value; + field.handleChange({ + ...baseConfig, + error_code: + raw === '' + ? null + : parseInt(raw, 10), + }); + }} + onBlur={field.handleBlur} + disabled={isReadOnly} + className={inputClass} + /> +
+ {!isReadOnly && ( + + )} +
+ )} + + {/* Add individual status code buttons */} + {!isReadOnly && + (!shownSuccessCode || + !shownErrorCode) && ( +
+ {!shownSuccessCode && ( + + )} + {!shownErrorCode && ( + + )} +
+ )} +
+ )} +
+ ); + }} +
+ )} + + {/* Configure Response Status (when not yet added) */} + {!hasResponseStatus && !isReadOnly && ( + + )} + + {/* Response Body */} +
+

+ Response Body +

+

+ The final run state is returned as the + response body. +

+
+ +

+ To return a custom body or status code from + your job, set{' '} + + webhookResponse + {' '} + in the job code.{' '} + + Learn more + +

+
+
+
+ ); + }} + ); diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index bffa8d11577..0c4e142e697 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -106,6 +106,8 @@ function transformTrigger(trigger: Trigger) { break; case 'webhook': output.webhook_reply = trigger.webhook_reply ?? 'before_start'; + output.sync_webhook_response_config = + trigger.sync_webhook_response_config ?? null; break; } return output; diff --git a/assets/js/collaborative-editor/types/session.ts b/assets/js/collaborative-editor/types/session.ts index 3c418b7bcfb..64f0edbcb7d 100644 --- a/assets/js/collaborative-editor/types/session.ts +++ b/assets/js/collaborative-editor/types/session.ts @@ -71,6 +71,10 @@ export namespace Session { cron_expression: string | null; has_auth_method: boolean; webhook_reply: 'before_start' | 'after_completion' | null; + sync_webhook_response_config: { + success_code: number | null; + error_code: number | null; + } | null; webhook_auth_methods: Array<{ id: string; name: string; diff --git a/assets/js/collaborative-editor/types/trigger.ts b/assets/js/collaborative-editor/types/trigger.ts index 6605f9f212f..d51152d33f3 100644 --- a/assets/js/collaborative-editor/types/trigger.ts +++ b/assets/js/collaborative-editor/types/trigger.ts @@ -24,6 +24,13 @@ const webhookTriggerSchema = baseTriggerSchema.extend({ .enum(['before_start', 'after_completion']) .nullable() .default('before_start'), + sync_webhook_response_config: z + .object({ + success_code: z.number().int().nullable().default(null), + error_code: z.number().int().nullable().default(null), + }) + .nullable() + .default(null), }); // Cron trigger schema with professional validation using cron-validator @@ -49,6 +56,7 @@ const cronTriggerSchema = baseTriggerSchema.extend({ ), cron_cursor_job_id: z.string().uuid().nullable().default(null), kafka_configuration: z.null().default(null), + sync_webhook_response_config: z.null().default(null), webhook_reply: z.null().default(null).catch(null), }); @@ -103,6 +111,7 @@ const kafkaTriggerSchema = baseTriggerSchema.extend({ cron_expression: z.null().default(null), cron_cursor_job_id: z.null().default(null), kafka_configuration: kafkaConfigSchema, + sync_webhook_response_config: z.null().default(null), webhook_reply: z.null().default(null).catch(null), }); @@ -137,6 +146,7 @@ export const createDefaultTrigger = ( cron_cursor_job_id: null, kafka_configuration: null, webhook_reply: 'before_start' as const, + sync_webhook_response_config: null, }; case 'cron': diff --git a/assets/js/yaml/schema/workflow-spec.json b/assets/js/yaml/schema/workflow-spec.json index f6385428593..d677757d15a 100644 --- a/assets/js/yaml/schema/workflow-spec.json +++ b/assets/js/yaml/schema/workflow-spec.json @@ -62,6 +62,14 @@ "type": ["string", "null"], "enum": ["before_start", "after_completion", "custom", null] }, + "webhook_response": { + "type": ["object", "null"], + "properties": { + "success_code": { "type": ["integer", "null"] }, + "error_code": { "type": ["integer", "null"] } + }, + "additionalProperties": false + }, "pos": { "type": "object", "properties": { diff --git a/assets/js/yaml/types.ts b/assets/js/yaml/types.ts index 321d29ffffa..db99b798e25 100644 --- a/assets/js/yaml/types.ts +++ b/assets/js/yaml/types.ts @@ -28,7 +28,11 @@ export type StateWebhookTrigger = { id: string; enabled: boolean; type: 'webhook'; - webhook_reply: 'before_start' | 'after_completion' | 'custom' | null; + webhook_reply: 'before_start' | 'after_completion' | null | undefined; + sync_webhook_response_config?: { + success_code?: number | null; + error_code?: number | null; + } | null; }; export type StateKafkaTrigger = { @@ -87,11 +91,17 @@ export type SpecCronTrigger = { pos: Position | undefined; }; +export type WebhookResponseConfig = { + success_code?: number | null; + error_code?: number | null; +}; + export type SpecWebhookTrigger = { id?: string; type: 'webhook'; enabled: boolean; webhook_reply: string | null; + webhook_response?: WebhookResponseConfig | null; pos: Position | undefined; }; diff --git a/assets/js/yaml/util.ts b/assets/js/yaml/util.ts index 381b3ec4b1e..3a00a8ec5e5 100644 --- a/assets/js/yaml/util.ts +++ b/assets/js/yaml/util.ts @@ -82,6 +82,18 @@ export const convertWorkflowStateToSpec = ( if (trigger.type === 'webhook') { triggerDetails.webhook_reply = trigger.webhook_reply ?? null; + const config = trigger.sync_webhook_response_config; + if ( + config && + (config.success_code != null || config.error_code != null) + ) { + triggerDetails.webhook_response = { + ...(config.success_code != null && { + success_code: config.success_code, + }), + ...(config.error_code != null && { error_code: config.error_code }), + }; + } } // TODO: handle kafka config @@ -186,6 +198,9 @@ export const convertWorkflowSpecToState = ( type: 'webhook', enabled, webhook_reply: specTrigger.webhook_reply, + sync_webhook_response_config: parseWebhookResponseConfig( + specTrigger.webhook_response + ), }; } else { trigger = { @@ -251,6 +266,13 @@ export const convertWorkflowSpecToState = ( return workflowState; }; +function parseWebhookResponseConfig( + config: import('./types').WebhookResponseConfig | null | undefined +): import('./types').WebhookResponseConfig | null { + if (!config) return null; + return config; +} + export const extractJobCredentials = (jobs: Workflow.Job[]): JobCredentials => { const credentials: JobCredentials = {}; for (const job of jobs) { diff --git a/assets/test/collaborative-editor/components/inspector/TriggerForm.test.tsx b/assets/test/collaborative-editor/components/inspector/TriggerForm.test.tsx index bf09a049943..2d1960b9bac 100644 --- a/assets/test/collaborative-editor/components/inspector/TriggerForm.test.tsx +++ b/assets/test/collaborative-editor/components/inspector/TriggerForm.test.tsx @@ -192,9 +192,11 @@ describe('TriggerForm - Response Mode Field', () => { // Check both options exist (use exact text to avoid "Async" matching "sync") expect( - screen.getByRole('option', { name: 'Async (default)' }) + screen.getByRole('option', { name: 'Async (Before Start)' }) + ).toBeInTheDocument(); + expect( + screen.getByRole('option', { name: 'Sync (After Completion)' }) ).toBeInTheDocument(); - expect(screen.getByRole('option', { name: 'Sync' })).toBeInTheDocument(); }); test('shows Async as default selected value', async () => { @@ -261,7 +263,7 @@ describe('TriggerForm - Response Mode Field', () => { await waitFor(() => { expect( screen.getByText( - /responds with the final output state after the run completes/i + /holds the http connection open and responds when the run completes/i ) ).toBeInTheDocument(); }); diff --git a/lib/lightning/collaboration/workflow_serializer.ex b/lib/lightning/collaboration/workflow_serializer.ex index c65a2215e62..297c4f1a4ad 100644 --- a/lib/lightning/collaboration/workflow_serializer.ex +++ b/lib/lightning/collaboration/workflow_serializer.ex @@ -228,7 +228,18 @@ defmodule Lightning.Collaboration.WorkflowSerializer do "id" => trigger.id, "type" => trigger.type |> to_string(), "webhook_reply" => - trigger.webhook_reply && to_string(trigger.webhook_reply) + trigger.webhook_reply && to_string(trigger.webhook_reply), + "sync_webhook_response_config" => + case trigger.sync_webhook_response_config do + nil -> + nil + + config -> + Yex.MapPrelim.from(%{ + "success_code" => config.success_code, + "error_code" => config.error_code + }) + end }) Yex.Array.push(triggers_array, trigger_map) @@ -274,9 +285,11 @@ defmodule Lightning.Collaboration.WorkflowSerializer do |> Enum.map(fn trigger -> trigger |> Map.take( - ~w(id type enabled cron_expression cron_cursor_job_id webhook_reply kafka_configuration) + ~w(id type enabled cron_expression cron_cursor_job_id webhook_reply + kafka_configuration sync_webhook_response_config) ) |> normalize_kafka_configuration() + |> normalize_sync_webhook_response_config() end) end @@ -299,6 +312,27 @@ defmodule Lightning.Collaboration.WorkflowSerializer do defp normalize_kafka_configuration(trigger), do: trigger + # Y.Doc serialises numbers as floats; convert integer codes back. + defp normalize_sync_webhook_response_config( + %{"sync_webhook_response_config" => %{} = config} = trigger + ) do + normalized = + config + |> normalize_integer_field("success_code") + |> normalize_integer_field("error_code") + + Map.put(trigger, "sync_webhook_response_config", normalized) + end + + defp normalize_sync_webhook_response_config(trigger), do: trigger + + defp normalize_integer_field(map, key) do + case Map.fetch(map, key) do + {:ok, value} when is_float(value) -> Map.put(map, key, trunc(value)) + _ -> map + end + end + defp extract_positions(positions_map) do Yex.Map.to_json(positions_map) end diff --git a/lib/lightning/export_utils.ex b/lib/lightning/export_utils.ex index b274485bbf7..06c89214dea 100644 --- a/lib/lightning/export_utils.ex +++ b/lib/lightning/export_utils.ex @@ -31,6 +31,7 @@ defmodule Lightning.ExportUtils do trigger: [ :type, :webhook_reply, + :webhook_response, :cron_expression, :cron_cursor_job, :enabled, @@ -121,14 +122,37 @@ defmodule Lightning.ExportUtils do Map.put(base, :kafka_configuration, kafka_config) :webhook -> - if trigger.webhook_reply do - Map.put(base, :webhook_reply, Atom.to_string(trigger.webhook_reply)) - else - base - end + base + |> maybe_put_webhook_reply(trigger.webhook_reply) + |> maybe_put_webhook_response(trigger.sync_webhook_response_config) + end + end + + defp maybe_put_webhook_reply(map, nil), do: map + + defp maybe_put_webhook_reply(map, reply) when is_atom(reply) do + Map.put(map, :webhook_reply, Atom.to_string(reply)) + end + + defp maybe_put_webhook_response(map, %{} = config) do + webhook_response = + Map.reject( + %{ + success_code: config.success_code, + error_code: config.error_code + }, + fn {_k, v} -> is_nil(v) end + ) + + if map_size(webhook_response) > 0 do + Map.put(map, :webhook_response, webhook_response) + else + map end end + defp maybe_put_webhook_response(map, _), do: map + defp edge_to_treenode(%{source_job_id: nil} = edge, triggers, jobs) do source_trigger = Enum.find(triggers, fn t -> t.id == edge.source_trigger_id end) diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex index 44f51cf1faa..421dee7a9ee 100644 --- a/lib/lightning/projects/provisioner.ex +++ b/lib/lightning/projects/provisioner.ex @@ -32,6 +32,7 @@ defmodule Lightning.Projects.Provisioner do alias Lightning.Workflows.Snapshot alias Lightning.Workflows.Trigger alias Lightning.Workflows.Triggers.KafkaConfiguration + alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig alias Lightning.Workflows.Workflow alias Lightning.Workflows.WorkflowUsageLimiter alias Lightning.WorkflowVersions @@ -458,6 +459,8 @@ defmodule Lightning.Projects.Provisioner do end defp trigger_changeset(trigger, attrs) do + attrs = remap_webhook_response(attrs) + trigger |> Trigger.cast_changeset(attrs) |> cast_embed( @@ -465,6 +468,11 @@ defmodule Lightning.Projects.Provisioner do required: false, with: &kafka_config_changeset/2 ) + |> cast_embed( + :sync_webhook_response_config, + required: false, + with: &SyncWebhookResponseConfig.changeset/2 + ) |> Trigger.validate() |> cast(attrs, [:delete]) |> validate_required([:id]) @@ -473,6 +481,16 @@ defmodule Lightning.Projects.Provisioner do |> maybe_mark_for_deletion() end + defp remap_webhook_response(attrs) do + case Map.pop(attrs, "webhook_response") do + {nil, attrs} -> + attrs + + {config, attrs} -> + Map.put(attrs, "sync_webhook_response_config", config) + end + end + defp kafka_config_changeset(kafka_config, attrs) do kafka_config |> KafkaConfiguration.changeset(attrs) diff --git a/lib/lightning/workflows/snapshot.ex b/lib/lightning/workflows/snapshot.ex index 07108390739..bede4ec5646 100644 --- a/lib/lightning/workflows/snapshot.ex +++ b/lib/lightning/workflows/snapshot.ex @@ -13,6 +13,7 @@ defmodule Lightning.Workflows.Snapshot do alias Lightning.Credentials.KeychainCredential alias Lightning.Projects.ProjectCredential alias Lightning.Repo + alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig alias Lightning.Workflows.WebhookAuthMethod alias Lightning.Workflows.Workflow @@ -64,6 +65,9 @@ defmodule Lightning.Workflows.Snapshot do values: [:before_start, :after_completion, :custom], default: :before_start + embeds_one :sync_webhook_response_config, SyncWebhookResponseConfig, + on_replace: :update + many_to_many :webhook_auth_methods, WebhookAuthMethod, join_through: "trigger_webhook_auth_methods", on_replace: :delete @@ -111,7 +115,7 @@ defmodule Lightning.Workflows.Snapshot do @job_fields Lightning.Workflows.Job.__schema__(:fields) -- [:workflow_id] @trigger_fields Lightning.Workflows.Trigger.__schema__(:fields) -- - [:workflow_id] + [:workflow_id, :sync_webhook_response_config] @edge_fields Lightning.Workflows.Edge.__schema__(:fields) -- [:workflow_id] defp job_changeset(schema, params) do @@ -124,6 +128,10 @@ defmodule Lightning.Workflows.Snapshot do schema |> cast(params, @trigger_fields) |> validate_required([:id, :inserted_at, :updated_at]) + |> cast_embed(:sync_webhook_response_config, + required: false, + with: &SyncWebhookResponseConfig.changeset/2 + ) end defp edge_changeset(schema, params) do @@ -142,7 +150,7 @@ defmodule Lightning.Workflows.Snapshot do |> Enum.into(%{}, fn {field, value} -> case field do field when field in @associations_to_include -> - {field, Enum.map(value, &Map.from_struct/1)} + {field, assoc_to_map(field, value)} field when field in [:name, :lock_version, :positions] -> {field, value} @@ -157,6 +165,40 @@ defmodule Lightning.Workflows.Snapshot do |> new() end + defp assoc_to_map(field, structs) when is_list(structs) do + Enum.map(structs, &assoc_to_map(field, &1)) + end + + defp assoc_to_map(:triggers, %module{} = struct) do + base = Ecto.embedded_dump(struct, :json) + + module.__schema__(:embeds) + |> Enum.reduce(base, fn field, acc -> + embed = Map.get(struct, field) + Map.put(acc, field, embed_to_map(embed)) + end) + end + + defp assoc_to_map(_, %_{} = struct) do + Ecto.embedded_dump(struct, :json) + end + + defp embed_to_map(%module{} = struct) do + base = Ecto.embedded_dump(struct, :json) + + module.__schema__(:embeds) + |> Enum.reduce(base, fn field, acc -> + embed = Map.get(struct, field) + Map.put(acc, field, embed_to_map(embed)) + end) + end + + defp embed_to_map(embeds) when is_list(embeds) do + Enum.map(embeds, &embed_to_map/1) + end + + defp embed_to_map(nil), do: nil + @spec create(Workflow.t()) :: {:ok, t()} | {:error, Ecto.Changeset.t()} def create(%Workflow{} = workflow) do diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex index c6aa449e350..48d309ea8bd 100644 --- a/lib/lightning/workflows/trigger.ex +++ b/lib/lightning/workflows/trigger.ex @@ -16,6 +16,7 @@ defmodule Lightning.Workflows.Trigger do alias Lightning.Workflows.Job alias Lightning.Workflows.Triggers.KafkaConfiguration + alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig alias Lightning.Workflows.Workflow @type t :: %__MODULE__{ @@ -36,7 +37,8 @@ defmodule Lightning.Workflows.Trigger do :cron_cursor_job_id, :type, :enabled, - :webhook_reply + :webhook_reply, + :sync_webhook_response_config ]} schema "triggers" do field :comment, :string @@ -62,6 +64,9 @@ defmodule Lightning.Workflows.Trigger do embeds_one :kafka_configuration, KafkaConfiguration, on_replace: :update + embeds_one :sync_webhook_response_config, SyncWebhookResponseConfig, + on_replace: :update + timestamps() end @@ -85,11 +90,14 @@ defmodule Lightning.Workflows.Trigger do def changeset(trigger, attrs) do trigger |> cast_changeset(attrs) - |> cast_embed( - :kafka_configuration, + |> cast_embed(:kafka_configuration, required: false, with: &KafkaConfiguration.changeset/2 ) + |> cast_embed(:sync_webhook_response_config, + required: false, + with: &SyncWebhookResponseConfig.changeset/2 + ) |> validate() end @@ -141,6 +149,7 @@ defmodule Lightning.Workflows.Trigger do |> put_change(:cron_cursor_job_id, nil) |> put_change(:kafka_configuration, nil) |> put_default(:webhook_reply, :before_start) + |> maybe_clear_sync_config() :cron -> changeset @@ -148,6 +157,7 @@ defmodule Lightning.Workflows.Trigger do |> validate_cron() |> put_change(:kafka_configuration, nil) |> put_change(:webhook_reply, nil) + |> put_change(:sync_webhook_response_config, nil) :kafka -> changeset @@ -155,12 +165,20 @@ defmodule Lightning.Workflows.Trigger do |> put_change(:cron_cursor_job_id, nil) |> validate_required([:kafka_configuration]) |> put_change(:webhook_reply, nil) + |> put_change(:sync_webhook_response_config, nil) nil -> changeset end end + defp maybe_clear_sync_config(changeset) do + case fetch_field!(changeset, :webhook_reply) do + :after_completion -> changeset + _ -> put_change(changeset, :sync_webhook_response_config, nil) + end + end + defp put_default(changeset, field, value) do changeset |> get_field(field) diff --git a/lib/lightning/workflows/triggers/sync_webhook_response_config.ex b/lib/lightning/workflows/triggers/sync_webhook_response_config.ex new file mode 100644 index 00000000000..5e2091678cd --- /dev/null +++ b/lib/lightning/workflows/triggers/sync_webhook_response_config.ex @@ -0,0 +1,27 @@ +defmodule Lightning.Workflows.Triggers.SyncWebhookResponseConfig do + @moduledoc """ + Embedded schema for the default webhook response sent on run completion. + + When a trigger's `webhook_reply` is `:after_completion`, this config controls + the HTTP response returned to the caller: + + - `success_code` — HTTP status code when the run succeeds. Defaults to 201. + - `error_code` — HTTP status code for any non-success terminal state + (failed, crashed, exception, killed, cancelled). Defaults to 201. + """ + + use Ecto.Schema + import Ecto.Changeset + + @derive {Jason.Encoder, only: [:success_code, :error_code]} + + @primary_key false + embedded_schema do + field :success_code, :integer + field :error_code, :integer + end + + def changeset(config, attrs) do + cast(config, attrs, [:success_code, :error_code]) + end +end diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index 4ff9d65ceef..67410893c6f 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -18,6 +18,11 @@ defmodule LightningWeb.RunChannel do require Jason.Helpers require Logger + defmodule WebhookResponse do + @moduledoc false + defstruct status: nil, body: nil, step_id: nil, sent_at: nil + end + @impl true def join( "run:" <> id, @@ -39,7 +44,8 @@ defmodule LightningWeb.RunChannel do id: id, run: run, project_id: project_id, - scrubber: nil + scrubber: nil, + webhook_response: nil })} else {:error, :not_found} -> @@ -115,15 +121,15 @@ defmodule LightningWeb.RunChannel do # instead of blocking the channel. run_with_preloads = run - |> Repo.preload([:log_lines, work_order: [:workflow, :trigger]]) + |> Repo.preload([:log_lines, work_order: [:workflow]]) run_with_preloads |> Lightning.FailureAlerter.alert_on_failure() - # Broadcast webhook response if after_completion is enabled - maybe_broadcast_webhook_response(run_with_preloads, payload) - - socket |> assign(run: run) |> reply_with({:ok, nil}) + socket + |> assign(run: run) + |> maybe_send_after_completion_response(payload["final_state"]) + |> reply_with({:ok, nil}) {:error, changeset} -> reply_with(socket, {:error, changeset}) @@ -210,7 +216,9 @@ defmodule LightningWeb.RunChannel do reply_with(socket, {:error, changeset}) {:ok, step} -> - reply_with(socket, {:ok, %{step_id: step.id}}) + socket + |> put_webhook_response(payload) + |> reply_with({:ok, %{step_id: step.id}}) end end @@ -306,53 +314,138 @@ defmodule LightningWeb.RunChannel do # Ignore other messages def handle_info(_msg, socket), do: {:noreply, socket} - defp maybe_broadcast_webhook_response(run, payload) do - work_order = run.work_order - trigger = work_order.trigger - - if trigger && trigger.type == :webhook && - trigger.webhook_reply == :after_completion do - topic = "work_order:#{work_order.id}:webhook_response" - - # TODO - Later allow workflow authors to customize the status code - # and body of the reply. - status_code = determine_status_code(run.state) - - body = %{ - data: payload["final_state"], - meta: %{ - work_order_id: work_order.id, - run_id: run.id, - state: run.state, - error_type: run.error_type, - inserted_at: run.inserted_at, - started_at: run.started_at, - claimed_at: run.claimed_at, - finished_at: run.finished_at - } - } + defp put_webhook_response(socket, payload) do + if already_sent?(socket.assigns.webhook_response) do + socket + else + case Map.get(payload, "webhook_response") do + %{} = wr -> + assign(socket, :webhook_response, %WebhookResponse{ + status: Map.get(wr, "status"), + body: Map.get(wr, "body"), + step_id: Map.get(payload, "step_id") + }) + + _ -> + socket + end + end + end + + defp already_sent?(%WebhookResponse{sent_at: %DateTime{}}), do: true + defp already_sent?(_), do: false + defp maybe_send_after_completion_response(socket, final_state) do + run = Repo.preload(socket.assigns.run, :starting_trigger) + trigger = run.starting_trigger + + if trigger && trigger.webhook_reply == :after_completion do + webhook_response = + build_webhook_response( + run, + final_state, + trigger.sync_webhook_response_config, + socket.assigns.webhook_response + ) + + socket + |> assign(:webhook_response, webhook_response) + |> maybe_broadcast_webhook_response() + else + socket + end + end + + defp maybe_broadcast_webhook_response(socket) do + %{run: run, webhook_response: %WebhookResponse{} = webhook_response} = + socket.assigns + + if already_sent?(webhook_response) do + socket + else Phoenix.PubSub.broadcast( Lightning.PubSub, - topic, - {:webhook_response, status_code, body} + "work_order:#{run.work_order_id}:webhook_response", + {:webhook_response, webhook_response.status, webhook_response.body} ) + + assign(socket, :webhook_response, %{ + webhook_response + | sent_at: DateTime.utc_now() + }) end end - # TODO - decide how we should respond... do we use HTTP codes for run states? - defp determine_status_code(state) do - case state do - :success -> 201 - :failed -> 201 - :crashed -> 201 - :exception -> 201 - :killed -> 201 - :cancelled -> 201 - _other -> 201 + defp build_webhook_response(run, final_state, config, nil) do + {status, body} = build_default_response(run, final_state, config) + %WebhookResponse{status: status, body: body} + end + + defp build_webhook_response( + run, + final_state, + config, + %WebhookResponse{} = webhook_response + ) do + with {:ok, custom_status} <- parse_webhook_status(webhook_response.status), + {:ok, custom_body} <- parse_webhook_body(webhook_response.body) do + status = custom_status || default_response_status(run.state, config) + body = custom_body || default_response_body(run.state, final_state, config) + %{webhook_response | status: status, body: body} + else + {:error, reason} -> + {status, body} = malformed_response(reason) + %{webhook_response | status: status, body: body} end end + defp parse_webhook_status(nil), do: {:ok, nil} + defp parse_webhook_status(status) when is_integer(status), do: {:ok, status} + + defp parse_webhook_status(status) when is_float(status), + do: {:ok, trunc(status)} + + defp parse_webhook_status(status), + do: {:error, "status needs to be an integer, got: #{inspect(status)}"} + + defp parse_webhook_body(nil), do: {:ok, nil} + defp parse_webhook_body(body) when is_map(body), do: {:ok, body} + + defp parse_webhook_body(body), + do: {:error, "body needs to be a JSON object, got: #{inspect(body)}"} + + defp build_default_response(run, final_state, config) do + {default_response_status(run.state, config), + default_response_body(run.state, final_state, config)} + end + + defp default_response_status(:success, %{success_code: code}) + when is_integer(code), + do: code + + defp default_response_status(:success, _config), do: 201 + + defp default_response_status(_run_status, %{error_code: code}) + when is_integer(code), + do: code + + defp default_response_status(_run_status, _config), do: 500 + + defp default_response_body(:success, final_state, _config), + do: final_state + + defp default_response_body(run_status, _final_state, _config) do + %{ + message: + "Run completed with status: #{run_status}. As a security policy, OpenFn does not send state data when the run errors out to avoid leaking sensitive information" + } + end + + defp malformed_response(reason) do + {201, + %{message: "Run completed, but webhook_response was malformed: #{reason}"}} + end + defp update_scrubber(nil, samples, basic_auth) do Scrubber.start_link(samples: samples, basic_auth: basic_auth) end diff --git a/lib/lightning_web/controllers/api/provisioning_json.ex b/lib/lightning_web/controllers/api/provisioning_json.ex index 3db39601d1c..62e27e7a5de 100644 --- a/lib/lightning_web/controllers/api/provisioning_json.ex +++ b/lib/lightning_web/controllers/api/provisioning_json.ex @@ -102,11 +102,19 @@ defmodule LightningWeb.API.ProvisioningJSON do ~w(hosts topics initial_offset_reset_policy connect_timeout)a ) + webhook_response = + trigger.sync_webhook_response_config && + Map.take(trigger.sync_webhook_response_config, [ + :success_code, + :error_code + ]) + trigger |> Map.take( ~w(id type cron_expression enabled webhook_reply cron_cursor_job_id)a ) |> Map.put(:kafka_configuration, kafka_configuration) + |> Map.put(:webhook_response, webhook_response) |> drop_keys_with_nil_value() end diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex index 1a5fc8e500b..ec0985812a3 100644 --- a/lib/lightning_web/controllers/webhooks_controller.ex +++ b/lib/lightning_web/controllers/webhooks_controller.ex @@ -63,6 +63,19 @@ defmodule LightningWeb.WebhooksController do ) |> case do {:ok, work_order} -> + conn = + conn + |> put_resp_header("x-meta-work-order-id", work_order.id) + |> then(fn conn -> + case work_order do + %{runs: [run]} -> + put_resp_header(conn, "x-meta-run-id", run.id) + + _ -> + conn + end + end) + if Workflows.Trigger.synchronous?(trigger) do handle_delayed_response(conn, work_order) else @@ -131,11 +144,6 @@ defmodule LightningWeb.WebhooksController do conn |> put_status(status_code) |> json(body) - - {:webhook_error, status_code, error} -> - conn - |> put_status(status_code) - |> json(%{error: error}) after Lightning.Config.webhook_response_timeout_ms() -> Logger.warning( diff --git a/priv/repo/migrations/20260422052859_add_webhook_response_codes_to_triggers.exs b/priv/repo/migrations/20260422052859_add_webhook_response_codes_to_triggers.exs new file mode 100644 index 00000000000..8508270c69f --- /dev/null +++ b/priv/repo/migrations/20260422052859_add_webhook_response_codes_to_triggers.exs @@ -0,0 +1,9 @@ +defmodule Lightning.Repo.Migrations.AddWebhookResponseCodesToTriggers do + use Ecto.Migration + + def change do + alter table(:triggers) do + add :sync_webhook_response_config, :jsonb, null: true + end + end +end diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs index be2544e0ca8..0a3197e42ff 100644 --- a/test/integration/web_and_worker_test.exs +++ b/test/integration/web_and_worker_test.exs @@ -12,6 +12,7 @@ defmodule Lightning.WebAndWorkerTest do alias Lightning.Runtime.RuntimeManager alias Lightning.WorkOrders alias Lightning.Workflows.Snapshot + alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig setup :set_mox_from_context setup :verify_on_exit! @@ -469,33 +470,8 @@ defmodule Lightning.WebAndWorkerTest do # Wait for the workflow to complete (up to 10 seconds) response = Task.await(task, 10_000) - # Should return 201 with the final state assert response.status == 201 - - # The response body should be the final state from the job inside a "data" - # key and the metadata inside a "meta" key. - assert %{ - "data" => %{"data" => %{"value" => 10}, "result" => "success"}, - "meta" => meta - } = response.body - - # Verify meta fields exist with correct types and values - assert meta["error_type"] == nil - assert meta["state"] == "success" - assert is_binary(meta["run_id"]) - assert is_binary(meta["work_order_id"]) - - # Verify datetime fields are present and valid ISO8601 strings - assert is_binary(meta["claimed_at"]) - assert is_binary(meta["finished_at"]) - assert is_binary(meta["inserted_at"]) - assert is_binary(meta["started_at"]) - - # Verify datetime fields match ISO8601 format - assert meta["claimed_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/ - assert meta["finished_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/ - assert meta["inserted_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/ - assert meta["started_at"] =~ ~r/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/ + assert %{"data" => %{"value" => 10}, "result" => "success"} = response.body # Verify the work order completed successfully work_order = @@ -717,9 +693,7 @@ defmodule Lightning.WebAndWorkerTest do assert response.status == 201 - assert %{"data" => final_state, "meta" => meta} = response.body - - assert meta["state"] == "success" + final_state = response.body # The final_state is keyed by job ID. When the same job is reached # twice (step 5), the second entry gets a "-1" suffix. @@ -744,6 +718,134 @@ defmodule Lightning.WebAndWorkerTest do assert Enum.count(steps) == 6 assert Enum.all?(steps, fn step -> step.exit_reason == "success" end) end + + @tag :integration + @tag timeout: 120_000 + test "uses configured success_code when workflow succeeds", %{uri: uri} do + project = insert(:project) + trigger = build(:trigger, type: :webhook, enabled: true) + + job = + build(:job, + adaptor: "@openfn/language-common@latest", + body: "fn(state => state);", + name: "job" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + [trigger] = workflow.triggers + + trigger + |> Ecto.Changeset.change(webhook_reply: :after_completion) + |> Ecto.Changeset.put_embed( + :sync_webhook_response_config, + %SyncWebhookResponseConfig{success_code: 200} + ) + |> Repo.update!() + + Snapshot.create(workflow |> Repo.reload!()) + + response = + build_tesla_client(uri) + |> Tesla.post!("/i/#{trigger.id}", %{"value" => 1}) + + assert response.status == 200 + end + + @tag :integration + @tag timeout: 120_000 + test "uses configured error_code when workflow fails", %{uri: uri} do + project = insert(:project) + trigger = build(:trigger, type: :webhook, enabled: true) + + job = + build(:job, + adaptor: "@openfn/language-common@latest", + body: "fn(state => { throw new Error('forced failure'); });", + name: "job" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + [trigger] = workflow.triggers + + trigger + |> Ecto.Changeset.change(webhook_reply: :after_completion) + |> Ecto.Changeset.put_embed( + :sync_webhook_response_config, + %SyncWebhookResponseConfig{error_code: 500} + ) + |> Repo.update!() + + Snapshot.create(workflow |> Repo.reload!()) + + response = + build_tesla_client(uri) + |> Tesla.post!("/i/#{trigger.id}", %{}) + + assert response.status == 500 + assert %{"message" => msg} = response.body + assert msg =~ "security policy" + end + + @tag :integration + @tag timeout: 120_000 + test "honours webhook_response sent on step:complete", %{uri: uri} do + project = insert(:project) + trigger = build(:trigger, type: :webhook, enabled: true) + + job = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => ({ + ...state, + webhookResponse: { status: 200, body: { ack: true, received: state.data.value } } + })); + """, + name: "responding-job" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + [trigger] = workflow.triggers + + trigger + |> Ecto.Changeset.change(webhook_reply: :after_completion) + |> Repo.update!() + + Snapshot.create(workflow |> Repo.reload!()) + + response = + build_tesla_client(uri) + |> Tesla.post!("/i/#{trigger.id}", %{"value" => 42}) + + assert response.status == 200 + assert response.body == %{"ack" => true, "received" => 42} + end + end + + defp build_tesla_client(uri) do + Tesla.client( + [{Tesla.Middleware.BaseUrl, uri}, Tesla.Middleware.JSON], + {Tesla.Adapter.Finch, name: Lightning.Finch} + ) end defp webhook_expression do diff --git a/test/lightning/collaboration/workflow_serializer_test.exs b/test/lightning/collaboration/workflow_serializer_test.exs index 3df6484e57d..e6325b1b5d2 100644 --- a/test/lightning/collaboration/workflow_serializer_test.exs +++ b/test/lightning/collaboration/workflow_serializer_test.exs @@ -761,7 +761,8 @@ defmodule Lightning.Collaboration.WorkflowSerializerTest do "cron_expression" => nil, "cron_cursor_job_id" => nil, "kafka_configuration" => nil, - "webhook_reply" => "before_start" + "webhook_reply" => "before_start", + "sync_webhook_response_config" => nil } == extracted_trigger assert is_nil(extracted["positions"]) @@ -983,7 +984,8 @@ defmodule Lightning.Collaboration.WorkflowSerializerTest do "cron_expression" => original_trigger.cron_expression, "cron_cursor_job_id" => original_trigger.cron_cursor_job_id, "kafka_configuration" => nil, - "webhook_reply" => nil + "webhook_reply" => nil, + "sync_webhook_response_config" => nil } == extracted_trigger # Positions @@ -1294,7 +1296,8 @@ defmodule Lightning.Collaboration.WorkflowSerializerTest do "cron_expression" => "0 */6 * * *", "cron_cursor_job_id" => original_trigger.cron_cursor_job_id, "kafka_configuration" => nil, - "webhook_reply" => nil + "webhook_reply" => nil, + "sync_webhook_response_config" => nil } == trigger end diff --git a/test/lightning/projects/provisioner_test.exs b/test/lightning/projects/provisioner_test.exs index bf1f31d9cee..c785cff074c 100644 --- a/test/lightning/projects/provisioner_test.exs +++ b/test/lightning/projects/provisioner_test.exs @@ -361,6 +361,49 @@ defmodule Lightning.Projects.ProvisionerTest do assert trigger.webhook_reply == :after_completion end + test "imports trigger with webhook_response field" do + Mox.verify_on_exit!() + user = insert(:user) + + %{body: %{"workflows" => [workflow]} = body, project_id: project_id} = + valid_document() + + updated_triggers = + Enum.map(workflow["triggers"], fn trigger -> + Map.merge(trigger, %{ + "type" => "webhook", + "webhook_reply" => "after_completion", + "webhook_response" => %{ + "success_code" => 200, + "error_code" => 500 + } + }) + end) + + body = + Map.put(body, "workflows", [ + Map.put(workflow, "triggers", updated_triggers) + ]) + + Mox.stub( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + {:ok, project} = + Provisioner.import_document( + %Lightning.Projects.Project{}, + user, + body + ) + + assert %{id: ^project_id, workflows: [%{triggers: [trigger]}]} = project + assert trigger.webhook_reply == :after_completion + assert trigger.sync_webhook_response_config.success_code == 200 + assert trigger.sync_webhook_response_config.error_code == 500 + end + test "imports cron trigger with cron_cursor_job_id field" do Mox.verify_on_exit!() user = insert(:user) diff --git a/test/lightning/projects_test.exs b/test/lightning/projects_test.exs index 18a4dca0786..dfcbd13817f 100644 --- a/test/lightning/projects_test.exs +++ b/test/lightning/projects_test.exs @@ -780,6 +780,59 @@ defmodule Lightning.ProjectsTest do assert generated_yaml =~ expected_yaml_trigger end + test "webhook_response config is included in the export" do + project = insert(:project, name: "project 1") + + trigger = + build(:trigger, + type: :webhook, + enabled: true, + webhook_reply: :after_completion, + sync_webhook_response_config: + build(:sync_webhook_response_config, + success_code: 200, + error_code: 500 + ) + ) + + job = + build(:job, + body: ~s[fn(state => state)] + ) + + build(:workflow, name: "workflow 1", project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + assert {:ok, generated_yaml} = Projects.export_project(:yaml, project.id) + + assert generated_yaml =~ "webhook_reply: after_completion" + + assert generated_yaml =~ "webhook_response:" + assert generated_yaml =~ "success_code: 200" + assert generated_yaml =~ "error_code: 500" + end + + test "webhook_response is omitted when sync_webhook_response_config is nil" do + project = insert(:project, name: "project 2") + + trigger = build(:trigger, type: :webhook, enabled: true) + + job = build(:job, body: ~s[fn(state => state)]) + + build(:workflow, name: "workflow 1", project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + assert {:ok, generated_yaml} = Projects.export_project(:yaml, project.id) + + refute generated_yaml =~ "webhook_response" + end + test "exports canonical project" do project = canonical_project_fixture( diff --git a/test/lightning_web/channels/run_channel_test.exs b/test/lightning_web/channels/run_channel_test.exs index 067004bd7e6..77cee65fe9d 100644 --- a/test/lightning_web/channels/run_channel_test.exs +++ b/test/lightning_web/channels/run_channel_test.exs @@ -1768,6 +1768,425 @@ defmodule LightningWeb.RunChannelTest do end end + describe "webhook response broadcasting" do + setup [:create_user, :create_project] + + setup %{project: project} do + dataclip = insert(:http_request_dataclip, project: project) + + trigger = + build(:trigger, + type: :webhook, + enabled: true, + webhook_reply: :after_completion + ) + + job = build(:job) + + %{triggers: [trigger], jobs: [job]} = + workflow = + build(:workflow, project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + work_order = + insert(:workorder, + workflow: workflow, + trigger: trigger, + dataclip: dataclip + ) + + run = + insert(:run, + work_order: work_order, + starting_trigger: trigger, + dataclip: dataclip, + state: :started, + options: + Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{ + project_id: project.id + }) + |> Map.new() + ) + + %{run: run, work_order: work_order, trigger: trigger, job: job} + end + + setup [:create_socket, :join_run_channel] + + setup %{work_order: work_order} do + Phoenix.PubSub.subscribe( + Lightning.PubSub, + "work_order:#{work_order.id}:webhook_response" + ) + + :ok + end + + test "does not broadcast when webhook_reply is not :after_completion", %{ + socket: socket, + run: run, + job: job, + trigger: trigger + } do + trigger + |> Ecto.Changeset.change(webhook_reply: :before_start) + |> Repo.update!() + + complete_step(socket, run, job, + webhook_response: %{"status" => 200, "body" => %{"data" => "ok"}} + ) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + refute_receive {:webhook_response, _, _} + end + + test "broadcasts final state as body on success with no captured response", + %{ + socket: socket + } do + final_state = %{"data" => %{"result" => "ok"}} + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => final_state + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 201, ^final_state} + end + + test "broadcasts security message on error with no captured response", %{ + socket: socket + } do + ref = + push(socket, "run:complete", %{ + "reason" => "fail", + "error_type" => "UserError", + "error_message" => nil + }) + + assert_reply ref, :ok, nil + + assert_receive {:webhook_response, 500, %{message: message}} + assert message =~ "failed" + assert message =~ "security policy" + end + + test "uses configured success_code on success", %{ + socket: socket, + trigger: trigger + } do + put_webhook_config(trigger, success_code: 200) + + final_state = %{"data" => "ok"} + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => final_state + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 200, ^final_state} + end + + test "uses configured error_code on error", %{ + socket: socket, + trigger: trigger + } do + put_webhook_config(trigger, error_code: 422) + + ref = + push(socket, "run:complete", %{ + "reason" => "fail", + "error_type" => "UserError", + "error_message" => nil + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 422, %{message: _}} + end + + test "uses 201 on success when only error_code is configured", %{ + socket: socket, + trigger: trigger + } do + put_webhook_config(trigger, error_code: 422) + + final_state = %{"data" => "ok"} + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => final_state + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 201, ^final_state} + end + + test "does not broadcast for runs with no starting trigger", %{ + socket: socket, + run: run, + job: job + } do + run + |> Ecto.Changeset.change( + starting_trigger_id: nil, + starting_job_id: job.id + ) + |> Repo.update!() + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + refute_receive {:webhook_response, _, _} + end + + test "captured webhook_response from step:complete overrides config", %{ + socket: socket, + run: run, + job: job, + trigger: trigger + } do + put_webhook_config(trigger, success_code: 999) + + override_body = %{"custom" => "response"} + + complete_step(socket, run, job, + webhook_response: %{"status" => 200, "body" => override_body} + ) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 200, ^override_body} + end + + test "last step:complete with a webhook_response wins (last-write-wins)", + %{ + socket: socket, + run: run, + job: job + } do + complete_step(socket, run, job, + webhook_response: %{"status" => 201, "body" => %{"first" => true}} + ) + + last_body = %{"last" => true} + + complete_step(socket, run, job, + webhook_response: %{"status" => 202, "body" => last_body} + ) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 202, ^last_body} + end + + test "step without webhook_response does not clear a previously captured one", + %{ + socket: socket, + run: run, + job: job + } do + captured_body = %{"captured" => true} + + complete_step(socket, run, job, + webhook_response: %{"status" => 200, "body" => captured_body} + ) + + complete_step(socket, run, job, []) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 200, ^captured_body} + end + + test "float status in webhook_response is normalised to integer", %{ + socket: socket, + run: run, + job: job + } do + override_body = %{"ok" => true} + + complete_step(socket, run, job, + webhook_response: %{"status" => 200.0, "body" => override_body} + ) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 200, ^override_body} + end + + test "webhook_response with only status uses default body", %{ + socket: socket, + run: run, + job: job + } do + final_state = %{"data" => "ok"} + + complete_step(socket, run, job, webhook_response: %{"status" => 200}) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => final_state + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 200, ^final_state} + end + + test "webhook_response with only body uses config status code", %{ + socket: socket, + run: run, + job: job, + trigger: trigger + } do + put_webhook_config(trigger, success_code: 202) + + custom_body = %{"data" => "ok"} + + complete_step(socket, run, job, webhook_response: %{"body" => custom_body}) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 202, ^custom_body} + end + + test "webhook_response with only body falls back to 201 when no config", %{ + socket: socket, + run: run, + job: job + } do + custom_body = %{"data" => "ok"} + + complete_step(socket, run, job, webhook_response: %{"body" => custom_body}) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + assert_receive {:webhook_response, 201, ^custom_body} + end + + test "malformed status in webhook_response yields 201 with explanation", %{ + socket: socket, + run: run, + job: job + } do + complete_step(socket, run, job, + webhook_response: %{"status" => "two hundred"} + ) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + + assert_receive {:webhook_response, 201, %{message: message}} + assert message =~ "webhook_response was malformed" + assert message =~ "status" + end + + test "malformed body in webhook_response yields 201 with explanation", %{ + socket: socket, + run: run, + job: job + } do + complete_step(socket, run, job, + webhook_response: %{"body" => "not a json object"} + ) + + ref = + push(socket, "run:complete", %{ + "reason" => "success", + "final_state" => %{"data" => "ok"} + }) + + assert_reply ref, :ok, nil + + assert_receive {:webhook_response, 201, %{message: message}} + assert message =~ "webhook_response was malformed" + assert message =~ "body" + end + end + + defp put_webhook_config(trigger, attrs) do + alias Lightning.Workflows.Triggers.SyncWebhookResponseConfig + config = struct(SyncWebhookResponseConfig, attrs) + + trigger + |> Ecto.Changeset.change() + |> Ecto.Changeset.put_embed(:sync_webhook_response_config, config) + |> Repo.update!() + end + + defp complete_step(socket, run, job, opts) do + step = insert(:step, runs: [run], job: job) + + payload = + %{ + "step_id" => step.id, + "output_dataclip_id" => Ecto.UUID.generate(), + "output_dataclip" => ~s({"foo": "bar"}), + "reason" => "normal" + } + |> maybe_put("webhook_response", Keyword.get(opts, :webhook_response)) + + ref = push(socket, "step:complete", payload) + assert_reply ref, :ok, %{step_id: _} + step + end + + defp maybe_put(map, _key, nil), do: map + defp maybe_put(map, key, value), do: Map.put(map, key, value) + defp create_socket_and_run(context) do merge_setups(context, [ :create_project, diff --git a/test/lightning_web/controllers/api/provisioning_controller_test.exs b/test/lightning_web/controllers/api/provisioning_controller_test.exs index 47c8bbee57b..ad77ed189cc 100644 --- a/test/lightning_web/controllers/api/provisioning_controller_test.exs +++ b/test/lightning_web/controllers/api/provisioning_controller_test.exs @@ -594,6 +594,79 @@ defmodule LightningWeb.API.ProvisioningControllerTest do } = trigger_json end + test "returns webhook_response when sync_webhook_response_config is set", %{ + conn: conn, + user: user + } do + project = insert(:project, project_users: [%{user_id: user.id}]) + + trigger = + build(:trigger, + type: :webhook, + webhook_reply: :after_completion, + sync_webhook_response_config: + build(:sync_webhook_response_config, + success_code: 200, + error_code: 500 + ) + ) + + job = build(:job) + + %{triggers: [%{id: trigger_id}]} = + build(:workflow, project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + conn = get(conn, ~p"/api/provision/#{project.id}") + response = json_response(conn, 200) + + assert %{ + "workflows" => [ + %{ + "triggers" => [trigger_json] + } + ] + } = response["data"] + + assert %{ + "id" => ^trigger_id, + "type" => "webhook", + "webhook_reply" => "after_completion", + "webhook_response" => %{ + "success_code" => 200, + "error_code" => 500 + } + } = trigger_json + end + + test "omits webhook_response when sync_webhook_response_config is nil", %{ + conn: conn, + user: user + } do + project = insert(:project, project_users: [%{user_id: user.id}]) + + trigger = build(:trigger, type: :webhook) + job = build(:job) + + build(:workflow, project: project) + |> with_trigger(trigger) + |> with_job(job) + |> with_edge({trigger, job}, condition_type: :always) + |> insert() + + conn = get(conn, ~p"/api/provision/#{project.id}") + response = json_response(conn, 200) + + assert %{ + "workflows" => [%{"triggers" => [trigger_json]}] + } = response["data"] + + refute Map.has_key?(trigger_json, "webhook_response") + end + test "returns a cron trigger with cron_cursor_job_id in the response", %{ conn: conn, user: user diff --git a/test/lightning_web/controllers/webhooks_controller_test.exs b/test/lightning_web/controllers/webhooks_controller_test.exs index f9731191909..4d3aa91adae 100644 --- a/test/lightning_web/controllers/webhooks_controller_test.exs +++ b/test/lightning_web/controllers/webhooks_controller_test.exs @@ -402,89 +402,60 @@ defmodule LightningWeb.WebhooksControllerTest do describe "delayed webhook response (webhook_reply: :after_completion)" do setup [:stub_rate_limiter_ok, :stub_usage_limiter_ok] - test "waits for and returns the final state on success", %{conn: conn} do - %{triggers: [trigger]} = + test "waits for and returns the broadcast response body and status code", %{ + conn: conn + } do + %{triggers: [trigger], project_id: project_id} = insert(:simple_workflow) |> Lightning.Repo.preload(:triggers) |> with_snapshot() - # Update trigger to use after_completion trigger = trigger |> Ecto.Changeset.change(webhook_reply: :after_completion) |> Repo.update!() - message = %{"foo" => "bar"} + Lightning.WorkOrders.Events.subscribe(project_id) - # Spawn a task that will post to the webhook - # This task will be blocked waiting for the webhook response test_pid = self() task = Task.async(fn -> - conn = post(conn, "/i/#{trigger.id}", message) + conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"}) send(test_pid, {:response, conn}) end) - # Wait for the work order to be created - Process.sleep(100) - - # Find the work order - work_order = - Lightning.Repo.one( - from wo in Lightning.WorkOrder, - where: wo.trigger_id == ^trigger.id, - order_by: [desc: wo.inserted_at], - limit: 1 - ) - - assert work_order - - # Get the run for metadata - run = - Lightning.Repo.one( - from r in Lightning.Run, - where: r.work_order_id == ^work_order.id, - limit: 1 - ) - - # Simulate the worker completing the run with final state - final_state_data = %{"result" => "success", "data" => %{"x" => 42}} - - response_body = %{ - data: final_state_data, - meta: %{ - work_order_id: work_order.id, - run_id: run.id, - state: :success, - error_type: nil, - inserted_at: run.inserted_at, - started_at: run.started_at, - claimed_at: run.claimed_at, - finished_at: run.finished_at - } + assert_receive %Lightning.WorkOrders.Events.WorkOrderCreated{ + work_order: work_order } + assert_receive %Lightning.WorkOrders.Events.RunCreated{run: run} + + body = %{"result" => "success", "value" => 42} + Phoenix.PubSub.broadcast( Lightning.PubSub, "work_order:#{work_order.id}:webhook_response", - {:webhook_response, 200, response_body} + {:webhook_response, 200, body} ) - # Now the task should complete with the response assert_receive {:response, response_conn}, 5_000 - response = json_response(response_conn, 200) - assert response["data"] == final_state_data - assert response["meta"]["work_order_id"] == work_order.id - assert response["meta"]["run_id"] == run.id - assert response["meta"]["state"] == "success" + assert json_response(response_conn, 200) == body + + assert get_resp_header(response_conn, "x-meta-work-order-id") == [ + work_order.id + ] + + assert get_resp_header(response_conn, "x-meta-run-id") == [run.id] Task.await(task) end - test "waits for and returns error state on failure", %{conn: conn} do - %{triggers: [trigger]} = + test "passes through any status code from the broadcast message", %{ + conn: conn + } do + %{triggers: [trigger], project_id: project_id} = insert(:simple_workflow) |> Lightning.Repo.preload(:triggers) |> with_snapshot() @@ -494,65 +465,39 @@ defmodule LightningWeb.WebhooksControllerTest do |> Ecto.Changeset.change(webhook_reply: :after_completion) |> Repo.update!() - message = %{"foo" => "bar"} + Lightning.WorkOrders.Events.subscribe(project_id) + test_pid = self() task = Task.async(fn -> - conn = post(conn, "/i/#{trigger.id}", message) + conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"}) send(test_pid, {:response, conn}) end) - Process.sleep(100) - - work_order = - Lightning.Repo.one( - from wo in Lightning.WorkOrder, - where: wo.trigger_id == ^trigger.id, - order_by: [desc: wo.inserted_at], - limit: 1 - ) - - assert work_order - - # Get the run for metadata - run = - Lightning.Repo.one( - from r in Lightning.Run, - where: r.work_order_id == ^work_order.id, - limit: 1 - ) - - error_data = %{"error" => "Something went wrong"} - - response_body = %{ - data: error_data, - meta: %{ - work_order_id: work_order.id, - run_id: run.id, - state: :failed, - error_type: "RuntimeError", - inserted_at: run.inserted_at, - started_at: run.started_at, - claimed_at: run.claimed_at, - finished_at: run.finished_at - } + assert_receive %Lightning.WorkOrders.Events.WorkOrderCreated{ + work_order: work_order } + assert_receive %Lightning.WorkOrders.Events.RunCreated{run: run} + + body = %{"message" => "run failed"} + Phoenix.PubSub.broadcast( Lightning.PubSub, "work_order:#{work_order.id}:webhook_response", - {:webhook_response, 422, response_body} + {:webhook_response, 422, body} ) assert_receive {:response, response_conn}, 5_000 - response = json_response(response_conn, 422) - assert response["data"] == error_data - assert response["meta"]["work_order_id"] == work_order.id - assert response["meta"]["run_id"] == run.id - assert response["meta"]["state"] == "failed" - assert response["meta"]["error_type"] == "RuntimeError" + assert json_response(response_conn, 422) == body + + assert get_resp_header(response_conn, "x-meta-work-order-id") == [ + work_order.id + ] + + assert get_resp_header(response_conn, "x-meta-run-id") == [run.id] Task.await(task) end @@ -560,7 +505,6 @@ defmodule LightningWeb.WebhooksControllerTest do test "returns timeout if workflow doesn't complete within timeout period", %{ conn: conn } do - # Set a shorter timeout for this test (2 seconds instead of default) expect(Lightning.MockConfig, :webhook_response_timeout_ms, fn -> 2_000 end) %{triggers: [trigger]} = @@ -573,19 +517,16 @@ defmodule LightningWeb.WebhooksControllerTest do |> Ecto.Changeset.change(webhook_reply: :after_completion) |> Repo.update!() - message = %{"foo" => "bar"} + conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"}) - # This will timeout since we never broadcast a response - conn = post(conn, "/i/#{trigger.id}", message) + [work_order_id] = get_resp_header(conn, "x-meta-work-order-id") assert json_response(conn, 504) == %{ "error" => "timeout", "message" => "Workflow did not complete within timeout period", - "work_order_id" => json_response(conn, 504)["work_order_id"] + "work_order_id" => work_order_id } - # Verify work order was still created - work_order_id = json_response(conn, 504)["work_order_id"] assert WorkOrders.get(work_order_id) end @@ -597,15 +538,12 @@ defmodule LightningWeb.WebhooksControllerTest do |> Lightning.Repo.preload(:triggers) |> with_snapshot() - # Ensure trigger is using default before_start assert trigger.webhook_reply == :before_start - message = %{"foo" => "bar"} - - # Should return immediately - conn = post(conn, "/i/#{trigger.id}", message) + conn = post(conn, "/i/#{trigger.id}", %{"foo" => "bar"}) - assert %{"work_order_id" => work_order_id} = json_response(conn, 200) + [work_order_id] = get_resp_header(conn, "x-meta-work-order-id") + assert %{"work_order_id" => ^work_order_id} = json_response(conn, 200) assert WorkOrders.get(work_order_id) end end diff --git a/test/support/factories.ex b/test/support/factories.ex index ffa99da33d0..68e7134deab 100644 --- a/test/support/factories.ex +++ b/test/support/factories.ex @@ -345,6 +345,10 @@ defmodule Lightning.Factories do } end + def sync_webhook_response_config_factory do + %Lightning.Workflows.Triggers.SyncWebhookResponseConfig{} + end + def triggers_kafka_configuration_factory do %Lightning.Workflows.Triggers.KafkaConfiguration{ group_id: "arb_group_id",