Skip to content
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ and this project adheres to

### Changed

- Allow users to select which workflow to merge for sandbox merging
[#4002](https://github.com/OpenFn/lightning/issues/4002)

### Fixed

## [2.16.0] - 2026-03-24
Expand Down
141 changes: 123 additions & 18 deletions lib/lightning/projects/merge_projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,30 @@ defmodule Lightning.Projects.MergeProjects do

Maps workflows from source to target project based on exact name matching.
Uses the existing merge_workflow/2 logic for individual workflow merging.
Workflows that don't match are marked for deletion (target) or creation (source).
Workflows that don't match are marked for deletion (target) or creation
(source).

## Parameters
* `source_project` - The project with modifications to merge
* `target_project` - The target project to merge changes onto
* `opts` - Keyword options:
* `:new_uuid_map` - Map of source UUID to target UUID (default: `%{}`)
* `:selected_workflow_ids` - List of source workflow IDs to merge.
When provided, only those source workflows are processed and no target
workflows are marked for deletion. When `nil` (default), all source
workflows are processed and unmatched target workflows are deleted.

## Returns
A map with the merged project structure ready for import, containing
workflow mappings and project data.
"""
@spec merge_project(Project.t(), Project.t(), map()) :: map()
def merge_project(source, target, new_uuid_map \\ %{})
def merge_project(source, target, opts \\ %{})

def merge_project(
%Project{} = source_project,
%Project{} = target_project,
new_uuid_map
opts
) do
source_project =
Repo.preload(source_project, workflows: [:jobs, :triggers, :edges])
Expand All @@ -44,11 +51,17 @@ defmodule Lightning.Projects.MergeProjects do
merge_project(
Map.from_struct(source_project),
Map.from_struct(target_project),
new_uuid_map
opts
)
end

def merge_project(source_project, target_project, new_uuid_map) do
def merge_project(source_project, target_project, opts) do
new_uuid_map = Map.get(opts, :new_uuid_map, %{})
selected_workflow_ids = Map.get(opts, :selected_workflow_ids, nil)

deleted_target_workflow_ids =
Map.get(opts, :deleted_target_workflow_ids, nil)

workflow_mappings =
map_project_workflow_names(source_project, target_project)

Expand All @@ -57,7 +70,9 @@ defmodule Lightning.Projects.MergeProjects do
source_project,
target_project,
workflow_mappings,
new_uuid_map
new_uuid_map,
selected_workflow_ids,
deleted_target_workflow_ids
)
end

Expand Down Expand Up @@ -682,14 +697,18 @@ defmodule Lightning.Projects.MergeProjects do
source_project,
target_project,
workflow_mappings,
new_uuid_map
new_uuid_map,
selected_workflow_ids,
deleted_target_workflow_ids
) do
merged_workflows =
build_merged_workflows(
source_project.workflows,
target_project.workflows,
workflow_mappings,
new_uuid_map
new_uuid_map,
selected_workflow_ids,
deleted_target_workflow_ids
)

target_project
Expand All @@ -705,11 +724,21 @@ defmodule Lightning.Projects.MergeProjects do
source_workflows,
target_workflows,
workflow_mappings,
new_uuid_map
new_uuid_map,
selected_workflow_ids,
deleted_target_workflow_ids
) do
# Filter source workflows to selected ones if a selection is specified
effective_source_workflows =
if selected_workflow_ids do
Enum.filter(source_workflows, &(&1.id in selected_workflow_ids))
else
source_workflows
end

# Process source workflows (matched and new)
merged_from_source =
Enum.map(source_workflows, fn source_workflow ->
Enum.map(effective_source_workflows, fn source_workflow ->
case Map.get(workflow_mappings, source_workflow.id) do
nil ->
build_new_workflow(source_workflow, new_uuid_map)
Expand All @@ -721,17 +750,93 @@ defmodule Lightning.Projects.MergeProjects do
end
end)

# Mark unmatched target workflows for deletion
deleted_targets =
target_workflows
|> Enum.reject(fn workflow ->
workflow.id in Map.values(workflow_mappings)
merged_target_ids = MapSet.new(merged_from_source, fn wf -> wf["id"] end)

# When merging everything (no selection filter), delete unmatched target
# workflows. When a selection is provided, use deleted_target_workflow_ids
# to determine which unmatched target workflows to delete vs pass through.
extra_target_workflows =
if is_nil(selected_workflow_ids) do
target_workflows
|> Enum.reject(fn workflow ->
workflow.id in Map.values(workflow_mappings)
end)
|> Enum.map(fn workflow ->
%{"id" => workflow.id, "delete" => true}
end)
else
target_workflows
|> Enum.reject(fn workflow -> workflow.id in merged_target_ids end)
|> Enum.map(fn workflow ->
if deleted_target_workflow_ids &&
workflow.id in deleted_target_workflow_ids do
%{"id" => workflow.id, "delete" => true}
else
build_passthrough_workflow(workflow)
end
end)
end

merged_from_source ++ extra_target_workflows
end

defp build_passthrough_workflow(workflow) do
jobs =
Enum.map(workflow.jobs, fn job ->
job
|> Map.take([
:name,
:body,
:adaptor,
:project_credential_id,
:keychain_credential_id
])
|> Map.put(:id, job.id)
|> stringify_keys()
end)
|> Enum.map(fn workflow ->
%{"id" => workflow.id, "delete" => true}

triggers =
Enum.map(workflow.triggers, fn trigger ->
trigger
|> Map.take([
:comment,
:custom_path,
:cron_expression,
:type,
:kafka_configuration
])
|> Map.put(:id, trigger.id)
|> stringify_keys()
end)

merged_from_source ++ deleted_targets
edges =
Enum.map(workflow.edges, fn edge ->
edge
|> Map.take([
:condition_type,
:condition_expression,
:condition_label,
:enabled
])
|> Map.merge(%{
id: edge.id,
source_job_id: edge.source_job_id,
source_trigger_id: edge.source_trigger_id,
target_job_id: edge.target_job_id
})
|> stringify_keys()
end)

workflow
|> Map.take([:name, :concurrency, :enable_job_logs, :positions])
|> Map.merge(%{
id: workflow.id,
lock_version: workflow.lock_version,
jobs: jobs,
triggers: triggers,
edges: edges
})
|> stringify_keys()
end

defp build_new_workflow(source_workflow, new_uuid_map) do
Expand Down
39 changes: 35 additions & 4 deletions lib/lightning/projects/provisioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ defmodule Lightning.Projects.Provisioner do
{:delete, workflow_id}
end

defp classify_audit(%{
action: :update,
data: %{id: workflow_id},
changes: %{deleted_at: _}
}) do
{:delete, workflow_id}
end

defp classify_audit(%{
action: :update,
data: %{id: workflow_id},
Expand All @@ -159,7 +167,8 @@ defmodule Lightning.Projects.Provisioner do
project_changeset
|> get_assoc(:workflows)
|> Enum.reject(fn changeset ->
changeset.changes == %{} or get_change(changeset, :delete)
changeset.changes == %{} or get_change(changeset, :delete) == true or
not is_nil(get_change(changeset, :deleted_at))
end)
|> Enum.reduce(Multi.new(), fn changeset, multi ->
workflow =
Expand Down Expand Up @@ -197,7 +206,8 @@ defmodule Lightning.Projects.Provisioner do
project_changeset
|> get_assoc(:workflows)
|> Enum.reject(fn changeset ->
changeset.changes == %{} or get_change(changeset, :delete)
changeset.changes == %{} or get_change(changeset, :delete) == true or
not is_nil(get_change(changeset, :deleted_at))
end)
|> Enum.reduce(Multi.new(), fn changeset, multi ->
workflow =
Expand Down Expand Up @@ -413,10 +423,10 @@ defmodule Lightning.Projects.Provisioner do

defp workflow_changeset(workflow, attrs) do
workflow
|> cast(attrs, [:id, :name, :delete])
|> cast(attrs, [:id, :name, :delete, :deleted_at])
|> optimistic_lock(:lock_version)
|> validate_required([:id])
|> maybe_mark_for_deletion()
|> maybe_soft_delete_workflow()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha!! Niice

|> validate_extraneous_params(ignore: ["version_history"])
|> cast_assoc(:jobs, with: &job_changeset/2)
|> cast_assoc(:triggers, with: &trigger_changeset/2)
Expand Down Expand Up @@ -471,6 +481,27 @@ defmodule Lightning.Projects.Provisioner do
|> maybe_mark_for_deletion()
end

defp maybe_soft_delete_workflow(changeset) do
changeset.changes
|> Map.pop(:delete)
|> case do
{true, others} when map_size(others) == 0 ->
changeset
|> Map.put(:changes, others)
|> put_change(
:deleted_at,
DateTime.utc_now() |> DateTime.truncate(:second)
)

{true, others} when map_size(others) > 0 ->
changeset
|> add_error(:delete, "cannot change or add a record while deleting")

_ ->
changeset
end
end

defp maybe_mark_for_deletion(changeset) do
changeset.changes
|> Map.pop(:delete)
Expand Down
Loading
Loading