Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 87 additions & 10 deletions lib/lightning/runs/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,60 @@ defmodule Lightning.Runs.Handlers do
defmodule CompleteRun do
@moduledoc """
Schema to validate the input attributes of a completed run.

The worker may send either:
- `final_dataclip_id` — reuse an existing step output dataclip (single leaf)
- `final_state` — a new map to persist as a dataclip (multiple leaves)

These are mutually exclusive. If both are sent, `final_dataclip_id` wins.
"""
use Lightning.Schema

import Lightning.ChangesetUtils

alias Lightning.Invocation.Dataclip

@primary_key false
embedded_schema do
field :state, :string
field :reason, :string
field :error_type, :string
field :final_dataclip_id, Ecto.UUID
field :final_state, :map
field :project_id, Ecto.UUID
field :timestamp, Lightning.UnixDateTime
end

def call(run, params) do
with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do
run
|> Run.complete(to_run_params(complete_run))
|> case do
%{valid?: false} = changeset ->
{:error, changeset}

changeset ->
Runs.update_run(changeset)
end
Repo.transact(fn ->
with {:ok, run_params} <-
resolve_final_dataclip(complete_run, run.options) do
run
|> Run.complete(run_params)
|> case do
%{valid?: false} = changeset ->
{:error, changeset}

changeset ->
Runs.update_run(changeset)
end
end
end)
end
end

def new(params) do
%__MODULE__{}
|> cast(params, [:state, :reason, :error_type, :timestamp])
|> cast(params, [
:state,
:reason,
:error_type,
:final_dataclip_id,
:final_state,
:project_id,
:timestamp
])
|> put_new_change(:timestamp, Lightning.current_time())
|> then(fn changeset ->
if reason = get_change(changeset, :reason) do
Expand Down Expand Up @@ -119,6 +143,59 @@ defmodule Lightning.Runs.Handlers do
|> Map.take([:state, :error_type])
|> Map.put(:finished_at, complete_run.timestamp)
end

# When the worker sends an existing dataclip ID, use it directly.
defp resolve_final_dataclip(
%__MODULE__{final_dataclip_id: id} = complete_run,
_options
)
when is_binary(id) do
{:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)}
end

# When the worker sends a new final_state map, insert a new dataclip.
defp resolve_final_dataclip(
%__MODULE__{final_state: final_state, project_id: project_id} =
complete_run,
options
)
when is_map(final_state) and is_binary(project_id) do
case save_final_dataclip(final_state, project_id, options) do
{:ok, %Dataclip{id: id}} ->
{:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)}

error ->
error
end
end

# Neither provided (e.g., mark_run_lost, or worker didn't send final state).
defp resolve_final_dataclip(complete_run, _options) do
{:ok, to_run_params(complete_run)}
end

defp save_final_dataclip(
_final_state,
project_id,
%Runs.RunOptions{save_dataclips: false}
) do
Dataclip.new(%{
project_id: project_id,
body: nil,
wiped_at: Lightning.current_time() |> DateTime.truncate(:second),
type: :step_result
})
|> Repo.insert()
end

defp save_final_dataclip(final_state, project_id, _options) do
Dataclip.new(%{
project_id: project_id,
body: final_state,
type: :step_result
})
|> Repo.insert()
end
end

defmodule StartStep do
Expand Down
3 changes: 2 additions & 1 deletion lib/lightning/runs/run.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ defmodule Lightning.Run do
belongs_to :starting_trigger, Trigger
belongs_to :created_by, User
belongs_to :dataclip, Lightning.Invocation.Dataclip
belongs_to :final_dataclip, Lightning.Invocation.Dataclip

has_one :workflow, through: [:work_order, :workflow]
belongs_to :snapshot, Snapshot
Expand Down Expand Up @@ -169,7 +170,7 @@ defmodule Lightning.Run do
run
|> change()
|> put_change(:state, nil)
|> cast(params, [:state, :error_type, :finished_at])
|> cast(params, [:state, :error_type, :finished_at, :final_dataclip_id])
|> validate_required([:state, :finished_at])
|> validate_inclusion(:state, @final_states)
|> validate_state_change()
Expand Down
2 changes: 2 additions & 0 deletions lib/lightning_web/channels/run_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ defmodule LightningWeb.RunChannel do
end

def handle_in("run:complete", payload, socket) do
payload = Map.put(payload, "project_id", socket.assigns.project_id)

case Runs.complete_run(socket.assigns.run, payload) do
{:ok, run} ->
# TODO: Turn FailureAlerter into an Oban worker and process async
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Lightning.Repo.Migrations.AddFinalDataclipToRuns do
use Ecto.Migration

def change do
alter table(:runs) do
add :final_dataclip_id, references(:dataclips, type: :binary_id), null: true
end
end
end
176 changes: 174 additions & 2 deletions test/integration/web_and_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ defmodule Lightning.WebAndWorkerTest do

assert %{state: :success} = WorkOrders.get(workorder_id)

# There was an initial http_request dataclip and 7 run_result dataclips
assert Repo.all(Lightning.Invocation.Dataclip) |> Enum.count() == 8
# There was an initial http_request dataclip, 7 step_result dataclips,
# and 1 final_dataclip saved on the run
assert Repo.all(Lightning.Invocation.Dataclip) |> Enum.count() == 9
end

@tag :integration
Expand Down Expand Up @@ -572,6 +573,177 @@ defmodule Lightning.WebAndWorkerTest do

assert work_order.state == :failed
end

@tag :integration
@tag timeout: 120_000
test "returns final state with multiple leaf nodes from a branching workflow",
%{uri: uri} do
#
# +---+
# | T | (webhook trigger)
# +---+
# |
# +-+-+
# | 1 |
# +-+-+
# / \
# / \
# +-+-+ +-+-+
# | 2 | | 3 |
# +-+-+ +-+-+
# / \ |
# / \ |
# +-+-+ +-+-+ |
# | 4 | | 5 |<--+
# +---+ +---+
#
# Step 5 is reached twice (from step 2 and step 3).
# Step 4 is reached once (from step 2 only).
# The final_state payload should have 3 keys.
#

project = insert(:project)

webhook_trigger = build(:trigger, type: :webhook, enabled: true)

job_1 =
build(:job,
adaptor: "@openfn/language-common@latest",
body: """
fn(state => {
state.x = state.data.x * 2;
console.log("job1: x=" + state.x);
return state;
});
""",
name: "step-1"
)

job_2 =
build(:job,
adaptor: "@openfn/language-common@latest",
body: """
fn(state => {
state.x = state.x * 3;
console.log("job2: x=" + state.x);
return state;
});
""",
name: "step-2"
)

job_3 =
build(:job,
adaptor: "@openfn/language-common@latest",
body: """
fn(state => {
state.x = state.x * 5;
console.log("job3: x=" + state.x);
return state;
});
""",
name: "step-3"
)

job_4 =
build(:job,
adaptor: "@openfn/language-common@latest",
body: """
fn(state => {
state.x = state.x + 1;
console.log("job4: x=" + state.x);
return state;
});
""",
name: "step-4"
)

job_5 =
build(:job,
adaptor: "@openfn/language-common@latest",
body: """
fn(state => {
state.x = state.x + 100;
console.log("job5: x=" + state.x);
return state;
});
""",
name: "step-5"
)

workflow =
build(:workflow, project: project)
|> with_trigger(webhook_trigger)
|> with_job(job_1)
|> with_edge({webhook_trigger, job_1}, condition_type: :always)
|> with_job(job_2)
|> with_edge({job_1, job_2}, condition_type: :on_job_success)
|> with_job(job_3)
|> with_edge({job_1, job_3}, condition_type: :on_job_success)
|> with_job(job_4)
|> with_edge({job_2, job_4}, condition_type: :on_job_success)
|> with_job(job_5)
|> with_edge({job_2, job_5}, condition_type: :on_job_success)
|> with_edge({job_3, job_5}, condition_type: :on_job_success)
|> insert()

[trigger] = workflow.triggers

trigger =
trigger
|> Ecto.Changeset.change(webhook_reply: :after_completion)
|> Repo.update!()

Snapshot.create(workflow |> Repo.reload!())

# input x=1
# step 1: x = 1 * 2 = 2
# step 2: x = 2 * 3 = 6 (from step 1)
# step 3: x = 2 * 5 = 10 (from step 1)
# step 4: x = 6 + 1 = 7 (from step 2) — leaf
# step 5 via step 2: x = 6 + 100 = 106 — leaf
# step 5 via step 3: x = 10 + 100 = 110 — leaf
webhook_body = %{"x" => 1}

response =
Tesla.client(
[
{Tesla.Middleware.BaseUrl, uri},
Tesla.Middleware.JSON
],
{Tesla.Adapter.Finch, name: Lightning.Finch}
)
|> Tesla.post!("/i/#{trigger.id}", webhook_body)

assert response.status == 201

assert %{"data" => final_state, "meta" => meta} = response.body

assert meta["state"] == "success"

# The final_state is keyed by job ID. When the same job is reached
# twice (step 5), the second entry gets a "-1" suffix.
assert map_size(final_state) == 3

job_4_id = job_4.id
job_5_id = job_5.id

# job 4 appears once, keyed by its job ID
assert %{"x" => 7} = final_state[job_4_id]

# job 5 appears twice: once as job_5_id, once as job_5_id <> "-1"
assert %{"x" => x5a} = final_state[job_5_id]
assert %{"x" => x5b} = final_state[job_5_id <> "-1"]

assert Enum.sort([x5a, x5b]) == [106, 110]

# Verify all steps completed successfully
%{entries: steps} = Invocation.list_steps_for_project(project)

# 1 root + 2 branches from root + 2 from step 2 + 1 from step 3 = 6 steps
assert Enum.count(steps) == 6
assert Enum.all?(steps, fn step -> step.exit_reason == "success" end)
end
end

defp webhook_expression do
Expand Down