From da3e50770307103855e8b5a73d296c937cb151ba Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Fri, 13 Mar 2026 12:57:16 +0200 Subject: [PATCH 1/5] establish current behaviour for multi-leaf and multi-output workflows --- test/integration/web_and_worker_test.exs | 171 +++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs index f6af34b6df9..99c62ddbbe2 100644 --- a/test/integration/web_and_worker_test.exs +++ b/test/integration/web_and_worker_test.exs @@ -572,6 +572,177 @@ defmodule Lightning.WebAndWorkerTest do assert work_order.state == :failed end + + @tag :integration + @tag timeout: 120_000 + test "returns final state with multiple leaf nodes from a branching workflow", + %{uri: uri} do + # + # +---+ + # | T | (webhook trigger) + # +---+ + # | + # +-+-+ + # | 1 | + # +-+-+ + # / \ + # / \ + # +-+-+ +-+-+ + # | 2 | | 3 | + # +-+-+ +-+-+ + # / \ | + # / \ | + # +-+-+ +-+-+ | + # | 4 | | 5 |<--+ + # +---+ +---+ + # + # Step 5 is reached twice (from step 2 and step 3). + # Step 4 is reached once (from step 2 only). + # The final_state payload should have 3 keys. + # + + project = insert(:project) + + webhook_trigger = build(:trigger, type: :webhook, enabled: true) + + job_1 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.data.x * 2; + console.log("job1: x=" + state.x); + return state; + }); + """, + name: "step-1" + ) + + job_2 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x * 3; + console.log("job2: x=" + state.x); + return state; + }); + """, + name: "step-2" + ) + + job_3 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x * 5; + console.log("job3: x=" + state.x); + return state; + }); + """, + name: "step-3" + ) + + job_4 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x + 1; + console.log("job4: x=" + state.x); + return state; + }); + """, + name: "step-4" + ) + + job_5 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x + 100; + console.log("job5: x=" + state.x); + return state; + }); + """, + name: "step-5" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(webhook_trigger) + |> with_job(job_1) + |> with_edge({webhook_trigger, job_1}, condition_type: :always) + |> with_job(job_2) + |> with_edge({job_1, job_2}, condition_type: :on_job_success) + |> with_job(job_3) + |> with_edge({job_1, job_3}, condition_type: :on_job_success) + |> with_job(job_4) + |> with_edge({job_2, job_4}, condition_type: :on_job_success) + |> with_job(job_5) + |> with_edge({job_2, job_5}, condition_type: :on_job_success) + |> with_edge({job_3, job_5}, condition_type: :on_job_success) + |> insert() + + [trigger] = workflow.triggers + + trigger = + trigger + |> Ecto.Changeset.change(webhook_reply: :after_completion) + |> Repo.update!() + + Snapshot.create(workflow |> Repo.reload!()) + + # input x=1 + # step 1: x = 1 * 2 = 2 + # step 2: x = 2 * 3 = 6 (from step 1) + # step 3: x = 2 * 5 = 10 (from step 1) + # step 4: x = 6 + 1 = 7 (from step 2) — leaf + # step 5 via step 2: x = 6 + 100 = 106 — leaf + # step 5 via step 3: x = 10 + 100 = 110 — leaf + webhook_body = %{"x" => 1} + + response = + Tesla.client( + [ + {Tesla.Middleware.BaseUrl, uri}, + Tesla.Middleware.JSON + ], + {Tesla.Adapter.Finch, name: Lightning.Finch} + ) + |> Tesla.post!("/i/#{trigger.id}", webhook_body) + + assert response.status == 201 + + assert %{"data" => final_state, "meta" => meta} = response.body + + assert meta["state"] == "success" + + # The final_state is keyed by job ID. When the same job is reached + # twice (step 5), the second entry gets a "-1" suffix. + assert map_size(final_state) == 3 + + job_4_id = job_4.id + job_5_id = job_5.id + + # job 4 appears once, keyed by its job ID + assert %{"x" => 7} = final_state[job_4_id] + + # job 5 appears twice: once as job_5_id, once as job_5_id <> "-1" + assert %{"x" => x5a} = final_state[job_5_id] + assert %{"x" => x5b} = final_state[job_5_id <> "-1"] + + assert Enum.sort([x5a, x5b]) == [106, 110] + + # Verify all steps completed successfully + %{entries: steps} = Invocation.list_steps_for_project(project) + + # 1 root + 2 branches from root + 2 from step 2 + 1 from step 3 = 6 steps + assert Enum.count(steps) == 6 + assert Enum.all?(steps, fn step -> step.exit_reason == "success" end) + end end defp webhook_expression do From 6aa09a35eb54dc26b203ecdc90bfe0e4f082c3da Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Fri, 13 Mar 2026 13:42:22 +0200 Subject: [PATCH 2/5] wip final state preservation --- lib/lightning/runs/handlers.ex | 85 ++++++++++++++++++++--- lib/lightning/runs/run.ex | 3 +- lib/lightning_web/channels/run_channel.ex | 2 + 3 files changed, 79 insertions(+), 11 deletions(-) diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index e30827de9e0..d6bb4c7a459 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -66,31 +66,52 @@ defmodule Lightning.Runs.Handlers do import Lightning.ChangesetUtils + alias Lightning.Invocation.Dataclip + @primary_key false embedded_schema do field :state, :string field :reason, :string field :error_type, :string + field :final_state, :string + field :project_id, Ecto.UUID field :timestamp, Lightning.UnixDateTime end def call(run, params) do with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do - run - |> Run.complete(to_run_params(complete_run)) - |> case do - %{valid?: false} = changeset -> - {:error, changeset} - - changeset -> - Runs.update_run(changeset) - end + Repo.transact(fn -> + with {:ok, final_dataclip} <- + maybe_save_final_dataclip(complete_run, run.options) do + run_params = + complete_run + |> to_run_params() + |> maybe_put_final_dataclip_id(final_dataclip) + + run + |> Run.complete(run_params) + |> case do + %{valid?: false} = changeset -> + {:error, changeset} + + changeset -> + Runs.update_run(changeset) + end + end + end) end end def new(params) do %__MODULE__{} - |> cast(params, [:state, :reason, :error_type, :timestamp]) + |> cast(params, [ + :state, + :reason, + :error_type, + :final_state, + :project_id, + :timestamp + ]) |> put_new_change(:timestamp, Lightning.current_time()) |> then(fn changeset -> if reason = get_change(changeset, :reason) do @@ -119,6 +140,50 @@ defmodule Lightning.Runs.Handlers do |> Map.take([:state, :error_type]) |> Map.put(:finished_at, complete_run.timestamp) end + + defp maybe_put_final_dataclip_id(params, nil), do: params + + defp maybe_put_final_dataclip_id(params, %Dataclip{id: id}), + do: Map.put(params, :final_dataclip_id, id) + + defp maybe_save_final_dataclip( + %__MODULE__{final_state: nil}, + _options + ) do + {:ok, nil} + end + + defp maybe_save_final_dataclip( + %__MODULE__{project_id: nil}, + _options + ) do + {:ok, nil} + end + + defp maybe_save_final_dataclip( + %__MODULE__{final_state: _final_state, project_id: project_id}, + %Runs.RunOptions{save_dataclips: false} + ) do + Dataclip.new(%{ + project_id: project_id, + body: nil, + wiped_at: Lightning.current_time() |> DateTime.truncate(:second), + type: :step_result + }) + |> Repo.insert() + end + + defp maybe_save_final_dataclip( + %__MODULE__{final_state: final_state, project_id: project_id}, + _options + ) do + Dataclip.new(%{ + project_id: project_id, + body: Jason.decode!(final_state), + type: :step_result + }) + |> Repo.insert() + end end defmodule StartStep do diff --git a/lib/lightning/runs/run.ex b/lib/lightning/runs/run.ex index 2a7450a75d1..8a2156ab9e1 100644 --- a/lib/lightning/runs/run.ex +++ b/lib/lightning/runs/run.ex @@ -71,6 +71,7 @@ defmodule Lightning.Run do belongs_to :starting_trigger, Trigger belongs_to :created_by, User belongs_to :dataclip, Lightning.Invocation.Dataclip + belongs_to :final_dataclip, Lightning.Invocation.Dataclip has_one :workflow, through: [:work_order, :workflow] belongs_to :snapshot, Snapshot @@ -169,7 +170,7 @@ defmodule Lightning.Run do run |> change() |> put_change(:state, nil) - |> cast(params, [:state, :error_type, :finished_at]) + |> cast(params, [:state, :error_type, :finished_at, :final_dataclip_id]) |> validate_required([:state, :finished_at]) |> validate_inclusion(:state, @final_states) |> validate_state_change() diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index a080b38db7f..4ff9d65ceef 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -107,6 +107,8 @@ defmodule LightningWeb.RunChannel do end def handle_in("run:complete", payload, socket) do + payload = Map.put(payload, "project_id", socket.assigns.project_id) + case Runs.complete_run(socket.assigns.run, payload) do {:ok, run} -> # TODO: Turn FailureAlerter into an Oban worker and process async From 4d70b912168331e8621e380f420cfd7217ad9902 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Sat, 14 Mar 2026 12:30:43 +0200 Subject: [PATCH 3/5] migration --- .../20260313110903_add_final_dataclip_to_runs.exs | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs diff --git a/priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs b/priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs new file mode 100644 index 00000000000..de50ce75798 --- /dev/null +++ b/priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs @@ -0,0 +1,9 @@ +defmodule Lightning.Repo.Migrations.AddFinalDataclipToRuns do + use Ecto.Migration + + def change do + alter table(:runs) do + add :final_dataclip_id, references(:dataclips, type: :binary_id), null: true + end + end +end From fae328f3d989fffabc1bcc1b47c7b420d3b7ca00 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Sat, 14 Mar 2026 12:38:32 +0200 Subject: [PATCH 4/5] fix tests --- lib/lightning/runs/handlers.ex | 4 ++-- test/integration/web_and_worker_test.exs | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index d6bb4c7a459..58a8022b9bd 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -73,7 +73,7 @@ defmodule Lightning.Runs.Handlers do field :state, :string field :reason, :string field :error_type, :string - field :final_state, :string + field :final_state, :map field :project_id, Ecto.UUID field :timestamp, Lightning.UnixDateTime end @@ -179,7 +179,7 @@ defmodule Lightning.Runs.Handlers do ) do Dataclip.new(%{ project_id: project_id, - body: Jason.decode!(final_state), + body: final_state, type: :step_result }) |> Repo.insert() diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs index 99c62ddbbe2..d77592d38d4 100644 --- a/test/integration/web_and_worker_test.exs +++ b/test/integration/web_and_worker_test.exs @@ -108,8 +108,9 @@ defmodule Lightning.WebAndWorkerTest do assert %{state: :success} = WorkOrders.get(workorder_id) - # There was an initial http_request dataclip and 7 run_result dataclips - assert Repo.all(Lightning.Invocation.Dataclip) |> Enum.count() == 8 + # There was an initial http_request dataclip, 7 step_result dataclips, + # and 1 final_dataclip saved on the run + assert Repo.all(Lightning.Invocation.Dataclip) |> Enum.count() == 9 end @tag :integration From 53fab9be0c1d33184ccc053842d740f81df37f48 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Sat, 14 Mar 2026 15:53:40 +0200 Subject: [PATCH 5/5] dont duplicate --- lib/lightning/runs/handlers.ex | 66 ++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index 58a8022b9bd..81aa72fb644 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -61,6 +61,12 @@ defmodule Lightning.Runs.Handlers do defmodule CompleteRun do @moduledoc """ Schema to validate the input attributes of a completed run. + + The worker may send either: + - `final_dataclip_id` — reuse an existing step output dataclip (single leaf) + - `final_state` — a new map to persist as a dataclip (multiple leaves) + + These are mutually exclusive. If both are sent, `final_dataclip_id` wins. """ use Lightning.Schema @@ -73,6 +79,7 @@ defmodule Lightning.Runs.Handlers do field :state, :string field :reason, :string field :error_type, :string + field :final_dataclip_id, Ecto.UUID field :final_state, :map field :project_id, Ecto.UUID field :timestamp, Lightning.UnixDateTime @@ -81,13 +88,8 @@ defmodule Lightning.Runs.Handlers do def call(run, params) do with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do Repo.transact(fn -> - with {:ok, final_dataclip} <- - maybe_save_final_dataclip(complete_run, run.options) do - run_params = - complete_run - |> to_run_params() - |> maybe_put_final_dataclip_id(final_dataclip) - + with {:ok, run_params} <- + resolve_final_dataclip(complete_run, run.options) do run |> Run.complete(run_params) |> case do @@ -108,6 +110,7 @@ defmodule Lightning.Runs.Handlers do :state, :reason, :error_type, + :final_dataclip_id, :final_state, :project_id, :timestamp @@ -141,27 +144,39 @@ defmodule Lightning.Runs.Handlers do |> Map.put(:finished_at, complete_run.timestamp) end - defp maybe_put_final_dataclip_id(params, nil), do: params - - defp maybe_put_final_dataclip_id(params, %Dataclip{id: id}), - do: Map.put(params, :final_dataclip_id, id) - - defp maybe_save_final_dataclip( - %__MODULE__{final_state: nil}, + # When the worker sends an existing dataclip ID, use it directly. + defp resolve_final_dataclip( + %__MODULE__{final_dataclip_id: id} = complete_run, _options - ) do - {:ok, nil} + ) + when is_binary(id) do + {:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)} end - defp maybe_save_final_dataclip( - %__MODULE__{project_id: nil}, - _options - ) do - {:ok, nil} + # When the worker sends a new final_state map, insert a new dataclip. + defp resolve_final_dataclip( + %__MODULE__{final_state: final_state, project_id: project_id} = + complete_run, + options + ) + when is_map(final_state) and is_binary(project_id) do + case save_final_dataclip(final_state, project_id, options) do + {:ok, %Dataclip{id: id}} -> + {:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)} + + error -> + error + end end - defp maybe_save_final_dataclip( - %__MODULE__{final_state: _final_state, project_id: project_id}, + # Neither provided (e.g., mark_run_lost, or worker didn't send final state). + defp resolve_final_dataclip(complete_run, _options) do + {:ok, to_run_params(complete_run)} + end + + defp save_final_dataclip( + _final_state, + project_id, %Runs.RunOptions{save_dataclips: false} ) do Dataclip.new(%{ @@ -173,10 +188,7 @@ defmodule Lightning.Runs.Handlers do |> Repo.insert() end - defp maybe_save_final_dataclip( - %__MODULE__{final_state: final_state, project_id: project_id}, - _options - ) do + defp save_final_dataclip(final_state, project_id, _options) do Dataclip.new(%{ project_id: project_id, body: final_state,