diff --git a/CHANGELOG.md b/CHANGELOG.md index a72b7fbf0ea..7843836928f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,9 @@ and this project adheres to ### Fixed +- Fix version-stuck bug where the collaborative editor shows stale state after a + sandbox merge or CLI deploy. + [#4535](https://github.com/OpenFn/lightning/issues/4535) - Collection storage on Project Settings, Collections no longer shows `0` for collections holding less than one megabyte of data. The underlying counter was always correct, the rendering now reflects values at any scale. diff --git a/lib/lightning/collaboration.ex b/lib/lightning/collaboration.ex index cd05eb6fe0f..719663dc286 100644 --- a/lib/lightning/collaboration.ex +++ b/lib/lightning/collaboration.ex @@ -25,17 +25,33 @@ defmodule Lightning.Collaborate do Collaborate.start(user: user, workflow: workflow) """ + alias Lightning.Accounts.User alias Lightning.Collaboration.DocumentSupervisor alias Lightning.Collaboration.Registry alias Lightning.Collaboration.Session alias Lightning.Collaboration.Supervisor, as: SessionSupervisor + alias Lightning.Collaboration.Topology require Logger - @pg_scope :workflow_collaboration - @spec start(opts :: Keyword.t()) :: GenServer.on_start() def start(opts) do + base = Keyword.get(opts, :base, Topology.base()) + + case do_start(opts, base) do + {:error, {:error, :shared_doc_not_found}} -> + # A SharedDoc was registered in :pg but died before the Session could + # observe it (0ms auto_exit race). Yield one ms — enough for the timer + # to fire and clear :pg — then try once more from scratch. + Process.sleep(1) + do_start(opts, base) + + result -> + result + end + end + + defp do_start(opts, base) do session_id = Ecto.UUID.generate() parent_pid = Keyword.get(opts, :parent_pid, self()) @@ -52,45 +68,65 @@ defmodule Lightning.Collaborate do "Starting collaboration for document: #{document_name} (workflow: #{workflow.id})" ) - # Ensure document supervisor exists for this document - case lookup_shared_doc(document_name) do - nil -> - Logger.info("Starting document for #{document_name}") - {:ok, _doc_supervisor_pid} = start_document(workflow, document_name) - - _shared_doc_pid -> - Logger.info("Found existing document for #{document_name}") - :ok + # Ensure document supervisor exists for this document. + # Returns {:error, reason} on failure rather than raising, so callers + # such as WorkflowReconciler can handle failures gracefully. + doc_result = + case lookup_shared_doc(document_name, base) do + nil -> + Logger.info("Starting document for #{document_name}") + + case start_document(workflow, document_name, base) do + {:ok, _} -> :ok + error -> error + end + + _shared_doc_pid -> + Logger.info("Found existing document for #{document_name}") + :ok + end + + case doc_result do + :ok -> + # Start session for this user + user_id = if is_struct(user, User), do: user.id, else: nil + + SessionSupervisor.start_child(base, { + Session, + workflow: workflow, + user: user, + parent_pid: parent_pid, + document_name: document_name, + base: base, + name: + Topology.via( + base, + {:session, "#{document_name}:#{session_id}", user_id} + ) + }) + + error -> + error end - - # Start session for this user - SessionSupervisor.start_child({ - Session, - workflow: workflow, - user: user, - parent_pid: parent_pid, - document_name: document_name, - name: Registry.via({:session, "#{document_name}:#{session_id}", user.id}) - }) end def start_document( %Lightning.Workflows.Workflow{} = workflow, - document_name + document_name, + base \\ Topology.base() ) do - {:ok, doc_supervisor_pid} = - SessionSupervisor.start_child( - {DocumentSupervisor, - workflow: workflow, - document_name: document_name, - name: Registry.via({:doc_supervisor, document_name})} - ) - - {:ok, doc_supervisor_pid} + SessionSupervisor.start_child( + base, + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + base: base, + name: Topology.via(base, {:doc_supervisor, document_name})} + ) end - defp lookup_shared_doc(document_name) do - case :pg.get_members(@pg_scope, document_name) do + defp lookup_shared_doc(document_name, base) do + case :pg.get_members(Topology.pg_scope(base), document_name) do [] -> nil [shared_doc_pid | _] -> shared_doc_pid end diff --git a/lib/lightning/collaboration/document_supervisor.ex b/lib/lightning/collaboration/document_supervisor.ex index c943884d21d..8c953671185 100644 --- a/lib/lightning/collaboration/document_supervisor.ex +++ b/lib/lightning/collaboration/document_supervisor.ex @@ -14,15 +14,12 @@ defmodule Lightning.Collaboration.DocumentSupervisor do """ use GenServer - import Lightning.Collaboration.Registry, only: [via: 1] - alias Lightning.Collaboration.Persistence alias Lightning.Collaboration.PersistenceWriter + alias Lightning.Collaboration.Topology require Logger - @pg_scope :workflow_collaboration - def start_link(args, opts \\ []) do GenServer.start_link(__MODULE__, args, opts) end @@ -31,12 +28,18 @@ defmodule Lightning.Collaboration.DocumentSupervisor do def init(opts) do workflow = Keyword.fetch!(opts, :workflow) document_name = Keyword.fetch!(opts, :document_name) + base = Keyword.fetch!(opts, :base) + + # Resolve topology references once at init time so subsequent callbacks + # don't need to re-read the Mox stub or Application env. + pg_scope = Topology.pg_scope(base) {:ok, persistence_writer_pid} = PersistenceWriter.start_link( document_name: document_name, workflow_id: workflow.id, - name: via({:persistence_writer, document_name}) + base: base, + name: Topology.via(base, {:persistence_writer, document_name}) ) persistence_writer_ref = Process.monitor(persistence_writer_pid) @@ -53,17 +56,18 @@ defmodule Lightning.Collaboration.DocumentSupervisor do persistence_writer: persistence_writer_pid }} ], - name: via({:shared_doc, document_name}) + name: Topology.via(base, {:shared_doc, document_name}) ) # Register with :pg using document_name so versioned rooms are isolated - :ok = register_shared_doc_with_pg(document_name, shared_doc_pid) + :ok = :pg.join(pg_scope, document_name, shared_doc_pid) shared_doc_ref = Process.monitor(shared_doc_pid) {:ok, %{ workflow: workflow, + pg_scope: pg_scope, persistence_writer_pid: persistence_writer_pid, persistence_writer_ref: persistence_writer_ref, shared_doc_pid: shared_doc_pid, @@ -137,9 +141,4 @@ defmodule Lightning.Collaboration.DocumentSupervisor do {:stop, :normal, state |> Map.put(key, nil)} end - - # Supervisor.start_link(children, strategy: :one_for_all) - defp register_shared_doc_with_pg(document_name, shared_doc_pid) do - :pg.join(@pg_scope, document_name, shared_doc_pid) - end end diff --git a/lib/lightning/collaboration/persistence.ex b/lib/lightning/collaboration/persistence.ex index 10ff16204be..4dcd63c4677 100644 --- a/lib/lightning/collaboration/persistence.ex +++ b/lib/lightning/collaboration/persistence.ex @@ -9,9 +9,7 @@ defmodule Lightning.Collaboration.Persistence do @behaviour Yex.Sync.SharedDoc.PersistenceBehaviour alias Lightning.Collaboration.DocumentState - alias Lightning.Collaboration.PersistenceWriter alias Lightning.Collaboration.Session - alias Lightning.Collaboration.WorkflowSerializer require Logger @@ -30,7 +28,6 @@ defmodule Lightning.Collaboration.Persistence do case DocumentState.get_checkpoint_and_updates(doc_name) do {:ok, checkpoint, updates} -> apply_persisted_state(doc, doc_name, checkpoint, updates) - reconcile_or_reset(doc, doc_name, workflow) {:error, :not_found} -> Logger.info( @@ -46,16 +43,17 @@ defmodule Lightning.Collaboration.Persistence do @impl true def update_v1(state, update, doc_name, _doc) do - case PersistenceWriter.add_update(doc_name, update) do - :ok -> - state - - {:error, reason} -> + case state[:persistence_writer] do + nil -> Logger.error( - "Failed to add update to PersistenceWriter: #{inspect(reason)}" + "PersistenceWriter pid not in persistence state. document=#{doc_name}" ) state + + pid -> + GenServer.cast(pid, {:add_update, update}) + state end end @@ -89,96 +87,4 @@ defmodule Lightning.Collaboration.Persistence do DocumentState.apply_to_doc(doc, checkpoint, updates) Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}") end - - defp reconcile_or_reset(doc, doc_name, workflow) do - workflow_map = Yex.Doc.get_map(doc, "workflow") - persisted_lock_version = extract_lock_version(workflow_map) - current_lock_version = workflow.lock_version - - if stale?(persisted_lock_version, current_lock_version) do - Logger.warning(""" - Persisted Y.Doc is stale (persisted: #{inspect(persisted_lock_version)}, \ - current: #{current_lock_version}) - Discarding persisted state and reloading from database. - document=#{doc_name} - """) - - clear_and_reset_workflow(doc, workflow) - else - Logger.debug( - "Persisted Y.Doc is current (lock_version: #{current_lock_version}). document=#{doc_name}" - ) - - reconcile_workflow_metadata(doc, workflow) - end - end - - defp extract_lock_version(workflow_map) do - case Yex.Map.fetch(workflow_map, "lock_version") do - {:ok, version} when is_float(version) -> trunc(version) - {:ok, version} when is_integer(version) -> version - {:ok, nil} -> nil - :error -> nil - end - end - - defp stale?(nil, current_version), do: not is_nil(current_version) - - defp stale?(persisted_version, current_version), - do: persisted_version != current_version - - defp clear_and_reset_workflow(doc, workflow) do - # Same pattern as Session.clear_and_reset_doc - # Get all Yex collections BEFORE transaction to avoid VM deadlock - jobs_array = Yex.Doc.get_array(doc, "jobs") - edges_array = Yex.Doc.get_array(doc, "edges") - triggers_array = Yex.Doc.get_array(doc, "triggers") - - # Transaction 1: Clear all arrays - Yex.Doc.transaction(doc, "clear_stale_workflow", fn -> - clear_array(jobs_array) - clear_array(edges_array) - clear_array(triggers_array) - end) - - # Transaction 2: Re-serialize workflow from database - Session.initialize_workflow_document(doc, workflow) - - :ok - end - - defp clear_array(array) do - length = Yex.Array.length(array) - - if length > 0 do - Yex.Array.delete_range(array, 0, length) - end - end - - defp reconcile_workflow_metadata(doc, workflow) do - # Update workflow metadata fields to match current database state - # This is critical when loading persisted Y.Doc state that may be stale - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "reconcile_metadata", fn -> - # Update lock_version to current database value - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - - # Update name in case it changed - Yex.Map.set(workflow_map, "name", workflow.name) - - # Update deleted_at if present - Yex.Map.set( - workflow_map, - "deleted_at", - WorkflowSerializer.datetime_to_string(workflow.deleted_at) - ) - end) - - Logger.debug( - "Reconciled workflow metadata: lock_version=#{workflow.lock_version}, name=#{workflow.name}" - ) - - :ok - end end diff --git a/lib/lightning/collaboration/registry.ex b/lib/lightning/collaboration/registry.ex index 1723221f56a..5a0afb5b681 100644 --- a/lib/lightning/collaboration/registry.ex +++ b/lib/lightning/collaboration/registry.ex @@ -1,16 +1,18 @@ defmodule Lightning.Collaboration.Registry do @moduledoc """ - Registry for collaboration processes. + Registry-helpers for collaboration processes. - This Registry provides local process tracking for the collaboration system, - complementing the cluster-wide :pg process groups. It supports the following - key patterns: + This module provides convenience helpers around the `Registry` started by + `Lightning.Collaboration.Supervisor`. The actual registry name is resolved + through `Lightning.Collaboration.Topology` so the same call sites work for + the production singleton supervisor and per-test isolated supervisors. ## Supported Key Patterns - - `{:shared_doc, document_name}` - SharedDoc processes for documents (e.g., "workflow:workflow_id") - - `{:persistence_writer, document_name}` - PersistenceWriter processes (future use) - - `{:doc_supervisor, workflow_id}` - DocumentSupervisor processes (future use) + - `{:shared_doc, document_name}` - SharedDoc processes for documents + (e.g., "workflow:workflow_id") + - `{:persistence_writer, document_name}` - PersistenceWriter processes + - `{:doc_supervisor, document_name}` - DocumentSupervisor processes Session processes are not registered here as there may be multiple sessions for the same workflow and the same user. @@ -19,98 +21,41 @@ defmodule Lightning.Collaboration.Registry do This Registry is used for local node process lookup and coordination, while `:pg` (process groups) remains for cluster-wide SharedDoc uniqueness. The - Registry provides faster local lookups and better integration with supervision - trees. - - ## Usage - - Processes can register themselves either in their init callback or using via tuples - in their child_spec: - - # Session registration in init callback - Lightning.Collaboration.Registry.register({:session, "workflow_123", "user_456"}) - - # SharedDoc registration via child_spec - {SharedDoc, [ - doc_name: "workflow:workflow_123", - name: {:via, Registry, {Lightning.Collaboration.Registry.registry_name(), {:shared_doc, "workflow:workflow_123"}}} - ]} + Registry provides faster local lookups and better integration with + supervision trees. """ - @doc """ - Child specification for starting the Registry. - """ - def child_spec(_opts) do - %{ - id: __MODULE__, - start: {Registry, :start_link, [[keys: :unique, name: __MODULE__]]}, - type: :supervisor - } - end + alias Lightning.Collaboration.Topology @doc """ - Register the current process with the given key. - - ## Examples - - Lightning.Collaboration.Registry.register({:session, "workflow_123"}) - # => {:ok, #PID<0.123.0>} - - Lightning.Collaboration.Registry.register({:shared_doc, "workflow:workflow_123"}) - # => {:ok, #PID<0.123.0>} - - ## Error Cases - - Lightning.Collaboration.Registry.register({:session, "workflow_123"}) - # => {:error, {:already_registered, #PID<0.456.0>}} - + Register the current process with the given key in the active registry. """ @spec register(term()) :: {:ok, pid()} | {:error, {:already_registered, pid()}} def register(key) do - case Registry.register(__MODULE__, key, nil) do + case Registry.register(Topology.registry(), key, nil) do {:ok, _pid} -> {:ok, self()} {:error, reason} -> {:error, reason} end end - def via(key) do - {:via, Registry, {__MODULE__, key}} - end + @doc """ + Returns a `:via` tuple suitable for naming a process under the active + registry. + """ + def via(key), do: Topology.via(key) @doc """ Look up all processes registered with the given key. - Returns a list of {pid, value} tuples. Since we use unique keys, - this will typically return a single-item list or an empty list. - - ## Examples - - Lightning.Collaboration.Registry.lookup({:session, "workflow_123"}) - # => [{#PID<0.123.0>, nil}] - - Lightning.Collaboration.Registry.lookup({:session, "nonexistent"}) - # => [] - + Returns a list of `{pid, value}` tuples. """ @spec lookup(term()) :: [{pid(), term()}] def lookup(key) do - Registry.lookup(__MODULE__, key) + Registry.lookup(Topology.registry(), key) end @doc """ Find the pid registered with the given key. - - This is a convenience function that returns just the pid, or nil - if no process is registered. - - ## Examples - - Lightning.Collaboration.Registry.whereis({:session, "workflow_123"}) - # => #PID<0.123.0> - - Lightning.Collaboration.Registry.whereis({:session, "nonexistent"}) - # => nil - """ @spec whereis(term()) :: pid() | nil def whereis(key) do @@ -123,26 +68,19 @@ defmodule Lightning.Collaboration.Registry do def count(key \\ nil) def count(nil) do - Registry.count(__MODULE__) + Registry.count(Topology.registry()) end - def count(key) do + def count(key) when is_binary(key) do select(key) |> length() end @doc """ - Select processes registered with the given key (prefix). - - The key pattern expected is: - - `{:type, key}` - - `{:type, key, any()}`. - - We do a select with any key that _starts with_ the given key. + Select processes whose key starts with the given binary prefix. """ - @spec select(binary()) :: [{term(), pid()}] + @spec select(binary()) :: [[term() | pid()]] def select(key) when is_binary(key) do - # We have two select specs, for keys with and without values. - Registry.select(__MODULE__, [ + Registry.select(Topology.registry(), [ {{{:"$1", :"$2"}, :"$3", :"$4"}, [{:==, {:binary_part, :"$2", 0, byte_size(key)}, key}], [[:"$1", :"$3"]]}, {{{:"$1", :"$2", :"$5"}, :"$3", :"$4"}, @@ -162,4 +100,75 @@ defmodule Lightning.Collaboration.Registry do end end) end + + # --- Base-aware overloads (for test code and any explicit callers) --- + + @doc """ + Register the current process with the given key in the registry derived from `base`. + """ + def register(base, key) when is_atom(base) and not is_nil(base) do + case Registry.register(Topology.registry(base), key, nil) do + {:ok, _pid} -> {:ok, self()} + {:error, reason} -> {:error, reason} + end + end + + @doc """ + Returns a `:via` tuple for the registry derived from `base`. + """ + def via(base, key) when is_atom(base) and not is_nil(base), + do: Topology.via(base, key) + + @doc """ + Look up all processes registered with the given key in the registry derived from `base`. + """ + def lookup(base, key) when is_atom(base) and not is_nil(base) do + Registry.lookup(Topology.registry(base), key) + end + + @doc """ + Find the pid registered with the given key in the registry derived from `base`. + """ + def whereis(base, key) when is_atom(base) and not is_nil(base) do + case lookup(base, key) do + [{pid, _value}] -> pid + [] -> nil + end + end + + @doc """ + Count entries whose key starts with `key` in the registry derived from `base`. + """ + def count(base, key) when is_atom(base) and not is_nil(base) do + select(base, key) |> length() + end + + @doc """ + Select processes whose key starts with the given binary prefix in the registry derived from `base`. + """ + def select(base, key) + when is_atom(base) and not is_nil(base) and is_binary(key) do + Registry.select(Topology.registry(base), [ + {{{:"$1", :"$2"}, :"$3", :"$4"}, + [{:==, {:binary_part, :"$2", 0, byte_size(key)}, key}], [[:"$1", :"$3"]]}, + {{{:"$1", :"$2", :"$5"}, :"$3", :"$4"}, + [{:==, {:binary_part, :"$2", 0, byte_size(key)}, key}], [[:"$1", :"$3"]]} + ]) + end + + @doc """ + Return a map grouping processes for the given key prefix in the registry derived from `base`. + """ + def get_group(base, key) when is_atom(base) and not is_nil(base) do + select(base, key) + |> Enum.reduce(%{}, fn [type, pid], acc -> + case type do + :session -> + Map.update(acc, :sessions, [pid], fn existing -> existing ++ [pid] end) + + _ -> + Map.put(acc, type, pid) + end + end) + end end diff --git a/lib/lightning/collaboration/session.ex b/lib/lightning/collaboration/session.ex index 164d4b18578..a118261b6cf 100644 --- a/lib/lightning/collaboration/session.ex +++ b/lib/lightning/collaboration/session.ex @@ -21,7 +21,9 @@ defmodule Lightning.Collaboration.Session do import LightningWeb.CoreComponents, only: [translate_error: 1] alias Lightning.Accounts.User + alias Lightning.Collaboration.Topology alias Lightning.Collaboration.WorkflowSerializer + alias Lightning.VersionControl.ProjectRepoConnection alias Lightning.Workflows.Presence alias Lightning.Workflows.WorkflowUsageLimiter alias Yex.Sync.SharedDoc @@ -34,15 +36,15 @@ defmodule Lightning.Collaboration.Session do :shared_doc_pid, :user, :workflow, - :document_name + :document_name, + :base ] - @pg_scope :workflow_collaboration - @type start_opts :: [ workflow: Lightning.Workflows.Workflow.t(), - user: User.t(), - parent_pid: pid() + user: User.t() | ProjectRepoConnection.t(), + parent_pid: pid(), + base: atom() ] @doc """ @@ -69,6 +71,10 @@ defmodule Lightning.Collaboration.Session do GenServer.stop(session_pid) end + def shared_doc_pid(session_pid) do + GenServer.call(session_pid, :shared_doc_pid) + end + def child_spec(opts) do {opts, args} = Keyword.put_new_lazy(opts, :session_id, fn -> Ecto.UUID.generate() end) @@ -94,6 +100,7 @@ defmodule Lightning.Collaboration.Session do user = Keyword.fetch!(opts, :user) parent_pid = Keyword.fetch!(opts, :parent_pid) document_name = Keyword.fetch!(opts, :document_name) + base = Keyword.fetch!(opts, :base) Logger.info("Starting session for document #{document_name}") @@ -105,29 +112,36 @@ defmodule Lightning.Collaboration.Session do shared_doc_pid: nil, user: user, workflow: workflow, - document_name: document_name + document_name: document_name, + base: base } - lookup_shared_doc(document_name) + lookup_shared_doc(document_name, base) |> case do nil -> {:stop, {:error, :shared_doc_not_found}} shared_doc_pid -> - SharedDoc.observe(shared_doc_pid) - Logger.info("Joined SharedDoc for #{document_name}") - - # We track the user presence here so the the original WorkflowLive.Edit - # can be stopped from editing the workflow when someone else is editing it. - # Note: Presence tracking uses workflow.id, not document_name, because - # presence is about showing who is editing the workflow, not which version - Presence.track_user_presence( - user, - workflow.id, - self() - ) - - {:ok, %{state | shared_doc_pid: shared_doc_pid}} + try do + SharedDoc.observe(shared_doc_pid) + Logger.info("Joined SharedDoc for #{document_name}") + + # We track the user presence here so the the original WorkflowLive.Edit + # can be stopped from editing the workflow when someone else is editing it. + # Note: Presence tracking uses workflow.id, not document_name, because + # presence is about showing who is editing the workflow, not which version. + # Only track presence for real users — not system actors like ProjectRepoConnection. + if is_struct(user, User), + do: Presence.track_user_presence(user, workflow.id, self()) + + {:ok, %{state | shared_doc_pid: shared_doc_pid}} + catch + # GenServer.call raises an exit (not a rescuable exception) when the + # target process is dead. SharedDoc may have been registered in :pg + # but died before we could observe it (0ms auto_exit race). + # Return cleanly so Collaborate.start can retry. + :exit, _ -> {:stop, {:error, :shared_doc_not_found}} + end end end @@ -144,17 +158,14 @@ defmodule Lightning.Collaboration.Session do SharedDoc.unobserve(shared_doc_pid) end - Presence.untrack_user_presence( - state.user, - state.workflow.id, - self() - ) + if is_struct(state.user, User), + do: Presence.untrack_user_presence(state.user, state.workflow.id, self()) :ok end - def lookup_shared_doc(document_name) do - case :pg.get_members(@pg_scope, document_name) do + def lookup_shared_doc(document_name, base) do + case :pg.get_members(Topology.pg_scope(base), document_name) do [] -> nil [shared_doc_pid | _] -> shared_doc_pid end @@ -259,6 +270,11 @@ defmodule Lightning.Collaboration.Session do GenServer.call(session_pid, {:reset_workflow, user}, 10_000) end + @impl true + def handle_call(:shared_doc_pid, _from, state) do + {:reply, state.shared_doc_pid, state} + end + @impl true def handle_call(:get_doc, _from, %{shared_doc_pid: shared_doc_pid} = state) do {:reply, SharedDoc.get_doc(shared_doc_pid), state} diff --git a/lib/lightning/collaboration/supervisor.ex b/lib/lightning/collaboration/supervisor.ex index a9fd97cb64d..9ed03df47ec 100644 --- a/lib/lightning/collaboration/supervisor.ex +++ b/lib/lightning/collaboration/supervisor.ex @@ -2,40 +2,51 @@ defmodule Lightning.Collaboration.Supervisor do @moduledoc """ Supervisor for workflow collaboration infrastructure. - Manages the collaboration Registry, :pg process group, and a dynamic + Manages the collaboration Registry, `:pg` process group, and a dynamic supervisor for workflow collaboration processes. The Registry must be started first to ensure processes can register themselves during startup. + + Accepts an optional `:name` option so that tests can run isolated + supervision trees side-by-side. The default name (`__MODULE__`) is what + production uses; from there the Registry name, DynamicSupervisor name and + `:pg` scope are derived via `Lightning.Collaboration.Topology`. """ use Supervisor - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + alias Lightning.Collaboration.Topology + + def start_link(opts) do + name = Keyword.get(opts, :name, __MODULE__) + Supervisor.start_link(__MODULE__, name, name: name) end @impl true - def init(_opts) do + def init(name) do + registry = Topology.registry(name) + doc_sup = Topology.doc_supervisor(name) + pg_scope = Topology.pg_scope(name) + children = [ # Start Registry first - processes depend on it for registration - Lightning.Collaboration.Registry, + %{ + id: {:registry, registry}, + start: {Registry, :start_link, [[keys: :unique, name: registry]]}, + type: :supervisor + }, # Start :pg for cluster-wide SharedDoc coordination %{ - id: :workflow_collaboration_pg, - start: {:pg, :start_link, [:workflow_collaboration]}, + id: {:pg, pg_scope}, + start: {:pg, :start_link, [pg_scope]}, type: :worker }, # Start the dynamic supervisor for collaboration processes %{ - id: :collaboration_dynamic_supervisor, + id: {:doc_sup, doc_sup}, start: {DynamicSupervisor, :start_link, - [ - [ - strategy: :one_for_one, - name: __MODULE__.DocSupervisor - ] - ]}, + [[strategy: :one_for_one, name: doc_sup]]}, type: :supervisor } ] @@ -43,11 +54,20 @@ defmodule Lightning.Collaboration.Supervisor do Supervisor.init(children, strategy: :one_for_one) end - def start_child(child_spec) do - DynamicSupervisor.start_child(__MODULE__.DocSupervisor, child_spec) + @doc """ + Starts a child of the active document supervisor. + + Defaults to the production supervisor; tests can pass an explicit `base` + atom to start the child in their isolated tree. + """ + def start_child(base \\ __MODULE__, child_spec) do + DynamicSupervisor.start_child(Topology.doc_supervisor(base), child_spec) end - def terminate_child(pid) when is_pid(pid) do - DynamicSupervisor.terminate_child(__MODULE__.DocSupervisor, pid) + @doc """ + Terminates a child of the active document supervisor. + """ + def terminate_child(base \\ __MODULE__, pid) when is_pid(pid) do + DynamicSupervisor.terminate_child(Topology.doc_supervisor(base), pid) end end diff --git a/lib/lightning/collaboration/topology.ex b/lib/lightning/collaboration/topology.ex new file mode 100644 index 00000000000..1aa60bfe48a --- /dev/null +++ b/lib/lightning/collaboration/topology.ex @@ -0,0 +1,52 @@ +defmodule Lightning.Collaboration.Topology do + @moduledoc """ + Resolves the active collaboration supervisor's base name, from which the + Registry, DynamicSupervisor, and `:pg` scope names are derived. + + In production exactly one supervisor runs under the default base name. + In tests, `Lightning.CollaborationCase` overrides the base for the + duration of each test via `Application.put_env/3`. + """ + + @default_base Lightning.Collaboration.Supervisor + + @doc """ + Returns the base name of the active collaboration supervisor. + + Reads from application config so that every process in the VM — including + GenServer children spawned under the production supervisor during tests — + sees the same value without any Mox stub plumbing. + """ + def base do + Application.get_env(:lightning, __MODULE__, @default_base) + end + + @doc """ + Returns the Registry name derived from the active base. + """ + def registry, do: registry(base()) + def registry(base), do: Module.concat(base, Registry) + + @doc """ + Returns the dynamic supervisor name derived from the active base. + """ + def doc_supervisor, do: doc_supervisor(base()) + def doc_supervisor(base), do: Module.concat(base, DocSupervisor) + + @doc """ + Returns the `:pg` scope atom derived from the active base. + + The default base preserves the historical `:workflow_collaboration` scope so + that production deployments keep their existing process group name. + """ + def pg_scope, do: pg_scope(base()) + def pg_scope(@default_base), do: :workflow_collaboration + def pg_scope(other), do: :"#{other}_pg" + + @doc """ + Returns a `:via` tuple suitable for naming a process registered in the + active Registry. + """ + def via(key), do: {:via, Registry, {registry(), key}} + def via(base, key), do: {:via, Registry, {registry(base), key}} +end diff --git a/lib/lightning/collaboration/workflow_reconciler.ex b/lib/lightning/collaboration/workflow_reconciler.ex index 6fa7562f21c..1059f9477a4 100644 --- a/lib/lightning/collaboration/workflow_reconciler.ex +++ b/lib/lightning/collaboration/workflow_reconciler.ex @@ -6,10 +6,14 @@ defmodule Lightning.Collaboration.WorkflowReconciler do """ import Ecto.Changeset, only: [get_field: 2] + alias Lightning.Collaborate alias Lightning.Collaboration.Session + alias Lightning.Collaboration.Topology + alias Lightning.Collaboration.WorkflowSerializer alias Lightning.Workflows.Edge alias Lightning.Workflows.Job alias Lightning.Workflows.Trigger + alias Lightning.Workflows.Triggers.KafkaConfiguration alias Lightning.Workflows.Workflow alias Yex.Doc alias Yex.Sync.SharedDoc @@ -24,13 +28,19 @@ defmodule Lightning.Collaboration.WorkflowReconciler do Versioned snapshots (e.g., "workflow:123:v22") are read-only historical views and should never receive live updates from saves. """ - @spec reconcile_workflow_changes(Ecto.Changeset.t(), Workflow.t()) :: :ok - def reconcile_workflow_changes(%Ecto.Changeset{} = changeset, workflow) do + @spec reconcile_workflow_changes(Ecto.Changeset.t(), Workflow.t(), atom()) :: + :ok + def reconcile_workflow_changes( + %Ecto.Changeset{} = changeset, + workflow, + base \\ nil + ) do # Always reconcile to the latest (unversioned) document # Format: "workflow:123" (not "workflow:123:v22") document_name = "workflow:#{workflow.id}" + effective_base = base || Topology.base() - case Session.lookup_shared_doc(document_name) do + case Session.lookup_shared_doc(document_name, effective_base) do nil -> Logger.debug( "No active SharedDoc for workflow #{workflow.id}, skipping reconciliation" @@ -50,6 +60,391 @@ defmodule Lightning.Collaboration.WorkflowReconciler do :ok end + @doc """ + Reconciles the SharedDoc for a workflow with the current DB state. + + Called after an external write (provisioner import, sandbox merge) has + already updated the database. Goes through `Collaborate.start/1` regardless + of whether anyone is online — this is the unified path for both cases: + + - Nobody online: starts a new document, applies the diff, stops. The + document flushes to DB on shutdown, so the next user opens correct state. + - Someone online: joins the existing document, applies the diff (broadcast + to live users in real time), stops. The document stays alive. + + Unlike `reconcile_workflow_changes/2`, this is state-driven: it diffs the + live Y.Doc against the DB and applies the minimum targeted operations — + deleting phantom items (e.g. unsaved jobs from open tabs), inserting new + items, and updating changed items in-place. CRDT IDs are preserved for + unchanged items. + """ + @spec reconcile_workflow_from_db( + Workflow.t(), + Lightning.Accounts.User.t() + | Lightning.VersionControl.ProjectRepoConnection.t(), + atom() | nil + ) :: :ok + def reconcile_workflow_from_db(%Workflow{} = workflow, actor, base \\ nil) do + start_opts = + if base, + do: [workflow: workflow, user: actor, base: base], + else: [workflow: workflow, user: actor] + + case Collaborate.start(start_opts) do + {:ok, session_pid} -> + try do + Session.update_doc(session_pid, fn doc -> + apply_db_state_to_doc(doc, workflow) + end) + + # Session.stop/1 calls GenServer.stop/3 which re-raises as `:exit` + # when the DocumentSupervisor cascades on the last unobserve + # (auto_exit: true SharedDoc -> sup stops :normal -> DynSup kills + # session :shutdown). That exit escapes `rescue`, so catch it. + try do + Session.stop(session_pid) + catch + :exit, _reason -> :ok + end + + Phoenix.PubSub.broadcast( + Lightning.PubSub, + "workflow:collaborate:#{workflow.id}", + {:workflow_updated_externally, workflow} + ) + rescue + error -> + # Reconciler failure must never fail the provisioner. + Logger.error( + "Failed to reconcile SharedDoc for workflow #{workflow.id}: #{inspect(error)}" + ) + end + + :ok + + {:error, reason} -> + Logger.error( + "Could not start session to reconcile workflow #{workflow.id}: #{inspect(reason)}" + ) + + :ok + end + end + + defp apply_db_state_to_doc(doc, workflow) do + # Step 1: Pre-fetch all root Yex objects BEFORE transaction (avoids VM deadlock) + workflow_map = Doc.get_map(doc, "workflow") + jobs_array = Doc.get_array(doc, "jobs") + edges_array = Doc.get_array(doc, "edges") + triggers_array = Doc.get_array(doc, "triggers") + + # Step 2: Read current Y.Doc state as plain Elixir lists (still before transaction) + ydoc_jobs = Yex.Array.to_list(jobs_array) + ydoc_edges = Yex.Array.to_list(edges_array) + ydoc_triggers = Yex.Array.to_list(triggers_array) + + # Step 3: Pre-fetch body Y.Text references — must happen before transaction + body_texts = + Map.new(ydoc_jobs, fn job_map -> + id = Yex.Map.fetch!(job_map, "id") + {:ok, body} = Yex.Map.fetch(job_map, "body") + {id, body} + end) + + # Step 4: Compute all operations before opening the transaction + # Note: positions are intentionally not reconciled here. The provisioner + # changeset does not accept positions (validate_extraneous_params rejects + # them), so the DB value can never be newer than the Y.Doc. Reconciling + # positions would risk overwriting unsaved node drags from open editor tabs. + job_ops = compute_job_ops(jobs_array, ydoc_jobs, workflow.jobs, body_texts) + edge_ops = compute_edge_ops(edges_array, ydoc_edges, workflow.edges) + + trigger_ops = + compute_trigger_ops(triggers_array, ydoc_triggers, workflow.triggers) + + # Step 5: Apply everything in ONE transaction + Doc.transaction(doc, "provisioner_reconcile", fn -> + update_workflow_metadata(workflow_map, workflow) + Enum.each(job_ops ++ edge_ops ++ trigger_ops, &apply_reconcile_op/1) + end) + end + + defp update_workflow_metadata(workflow_map, workflow) do + Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) + Yex.Map.set(workflow_map, "name", workflow.name || "") + Yex.Map.set(workflow_map, "concurrency", workflow.concurrency) + Yex.Map.set(workflow_map, "enable_job_logs", workflow.enable_job_logs) + + Yex.Map.set( + workflow_map, + "deleted_at", + WorkflowSerializer.datetime_to_string(workflow.deleted_at) + ) + end + + # --------------------------------------------------------------------------- + # Job operations + # --------------------------------------------------------------------------- + + defp compute_job_ops(jobs_array, ydoc_jobs, db_jobs, body_texts) do + ydoc_ids = ydoc_jobs |> Enum.map(&Yex.Map.fetch!(&1, "id")) |> MapSet.new() + db_ids = db_jobs |> Enum.map(& &1.id) |> MapSet.new() + + phantom_ids = MapSet.difference(ydoc_ids, db_ids) + new_ids = MapSet.difference(db_ids, ydoc_ids) + existing_ids = MapSet.intersection(ydoc_ids, db_ids) + + delete_ops = + phantom_ids + |> Enum.map(&find_index_in_array(jobs_array, &1)) + |> Enum.reject(&is_nil/1) + |> Enum.sort(:desc) + |> Enum.map(&{:delete, jobs_array, &1}) + + insert_ops = + db_jobs + |> Enum.filter(&MapSet.member?(new_ids, &1.id)) + |> Enum.map(&{:insert, jobs_array, job_to_prelim(&1)}) + + update_ops = + db_jobs + |> Enum.filter(&MapSet.member?(existing_ids, &1.id)) + |> Enum.flat_map(fn db_job -> + ydoc_job = Enum.find(ydoc_jobs, &(Yex.Map.fetch!(&1, "id") == db_job.id)) + body_text = Map.get(body_texts, db_job.id) + job_update_ops(ydoc_job, body_text, db_job) + end) + + delete_ops ++ insert_ops ++ update_ops + end + + defp job_to_prelim(job) do + Yex.MapPrelim.from(%{ + "id" => job.id, + "name" => job.name || "", + "body" => Yex.TextPrelim.from(job.body || ""), + "adaptor" => job.adaptor, + "project_credential_id" => job.project_credential_id, + "keychain_credential_id" => job.keychain_credential_id + }) + end + + defp job_update_ops(ydoc_job, body_text, db_job) do + field_ops = + [ + {"name", db_job.name || ""}, + {"adaptor", db_job.adaptor}, + {"project_credential_id", db_job.project_credential_id}, + {"keychain_credential_id", db_job.keychain_credential_id} + ] + |> Enum.flat_map(fn {key, db_val} -> + case Yex.Map.fetch(ydoc_job, key) do + {:ok, ^db_val} -> [] + _ -> [{:set_field, ydoc_job, key, db_val}] + end + end) + + body_ops = + case body_text do + %Yex.Text{} = text -> + db_body = db_job.body || "" + current = Yex.Text.to_string(text) + if current != db_body, do: [{:update_text, text, db_body}], else: [] + + _ -> + [] + end + + field_ops ++ body_ops + end + + # --------------------------------------------------------------------------- + # Edge operations + # --------------------------------------------------------------------------- + + defp compute_edge_ops(edges_array, ydoc_edges, db_edges) do + ydoc_ids = ydoc_edges |> Enum.map(&Yex.Map.fetch!(&1, "id")) |> MapSet.new() + db_ids = db_edges |> Enum.map(& &1.id) |> MapSet.new() + + phantom_ids = MapSet.difference(ydoc_ids, db_ids) + new_ids = MapSet.difference(db_ids, ydoc_ids) + existing_ids = MapSet.intersection(ydoc_ids, db_ids) + + delete_ops = + phantom_ids + |> Enum.map(&find_index_in_array(edges_array, &1)) + |> Enum.reject(&is_nil/1) + |> Enum.sort(:desc) + |> Enum.map(&{:delete, edges_array, &1}) + + insert_ops = + db_edges + |> Enum.filter(&MapSet.member?(new_ids, &1.id)) + |> Enum.map(&{:insert, edges_array, edge_to_prelim(&1)}) + + update_ops = + db_edges + |> Enum.filter(&MapSet.member?(existing_ids, &1.id)) + |> Enum.flat_map(fn db_edge -> + ydoc_edge = + Enum.find(ydoc_edges, &(Yex.Map.fetch!(&1, "id") == db_edge.id)) + + edge_update_ops(ydoc_edge, db_edge) + end) + + delete_ops ++ insert_ops ++ update_ops + end + + defp edge_to_prelim(edge) do + Yex.MapPrelim.from(%{ + "id" => edge.id, + "condition_expression" => edge.condition_expression, + "condition_label" => edge.condition_label, + "condition_type" => to_string(edge.condition_type), + "enabled" => edge.enabled, + "source_job_id" => edge.source_job_id, + "source_trigger_id" => edge.source_trigger_id, + "target_job_id" => edge.target_job_id + }) + end + + defp edge_update_ops(ydoc_edge, db_edge) do + [ + {"condition_expression", db_edge.condition_expression}, + {"condition_label", db_edge.condition_label}, + {"condition_type", to_string(db_edge.condition_type)}, + {"enabled", db_edge.enabled}, + {"source_job_id", db_edge.source_job_id}, + {"source_trigger_id", db_edge.source_trigger_id}, + {"target_job_id", db_edge.target_job_id} + ] + |> Enum.flat_map(fn {key, db_val} -> + case Yex.Map.fetch(ydoc_edge, key) do + {:ok, ^db_val} -> [] + _ -> [{:set_field, ydoc_edge, key, db_val}] + end + end) + end + + # --------------------------------------------------------------------------- + # Trigger operations + # --------------------------------------------------------------------------- + + defp compute_trigger_ops(triggers_array, ydoc_triggers, db_triggers) do + ydoc_ids = + ydoc_triggers |> Enum.map(&Yex.Map.fetch!(&1, "id")) |> MapSet.new() + + db_ids = db_triggers |> Enum.map(& &1.id) |> MapSet.new() + + phantom_ids = MapSet.difference(ydoc_ids, db_ids) + new_ids = MapSet.difference(db_ids, ydoc_ids) + existing_ids = MapSet.intersection(ydoc_ids, db_ids) + + delete_ops = + phantom_ids + |> Enum.map(&find_index_in_array(triggers_array, &1)) + |> Enum.reject(&is_nil/1) + |> Enum.sort(:desc) + |> Enum.map(&{:delete, triggers_array, &1}) + + insert_ops = + db_triggers + |> Enum.filter(&MapSet.member?(new_ids, &1.id)) + |> Enum.map(&{:insert, triggers_array, trigger_to_prelim(&1)}) + + update_ops = + db_triggers + |> Enum.filter(&MapSet.member?(existing_ids, &1.id)) + |> Enum.flat_map(fn db_trigger -> + ydoc_trigger = + Enum.find(ydoc_triggers, &(Yex.Map.fetch!(&1, "id") == db_trigger.id)) + + trigger_update_ops(ydoc_trigger, db_trigger) + end) + + delete_ops ++ insert_ops ++ update_ops + end + + defp trigger_to_prelim(trigger) do + kafka_configuration = + trigger.kafka_configuration && + Yex.MapPrelim.from(%{ + "connect_timeout" => trigger.kafka_configuration.connect_timeout, + "group_id" => trigger.kafka_configuration.group_id, + "hosts_string" => + KafkaConfiguration.generate_hosts_string( + trigger.kafka_configuration.hosts + ), + "initial_offset_reset_policy" => + trigger.kafka_configuration.initial_offset_reset_policy, + "password" => trigger.kafka_configuration.password, + "sasl" => to_string(trigger.kafka_configuration.sasl), + "ssl" => trigger.kafka_configuration.ssl, + "topics_string" => + KafkaConfiguration.generate_topics_string( + trigger.kafka_configuration.topics + ), + "username" => trigger.kafka_configuration.username + }) + + Yex.MapPrelim.from(%{ + "id" => trigger.id, + "type" => to_string(trigger.type), + "enabled" => trigger.enabled, + "cron_expression" => trigger.cron_expression, + "cron_cursor_job_id" => trigger.cron_cursor_job_id, + "webhook_reply" => + trigger.webhook_reply && to_string(trigger.webhook_reply), + "kafka_configuration" => kafka_configuration + }) + end + + defp trigger_update_ops(ydoc_trigger, db_trigger) do + [ + {"type", to_string(db_trigger.type)}, + {"enabled", db_trigger.enabled}, + {"cron_expression", db_trigger.cron_expression}, + {"cron_cursor_job_id", db_trigger.cron_cursor_job_id}, + {"webhook_reply", + db_trigger.webhook_reply && to_string(db_trigger.webhook_reply)} + ] + |> Enum.flat_map(fn {key, db_val} -> + case Yex.Map.fetch(ydoc_trigger, key) do + {:ok, ^db_val} -> [] + _ -> [{:set_field, ydoc_trigger, key, db_val}] + end + end) + + # kafka_configuration nested fields are not updated here — the provisioner + # does not expose kafka config changes through this reconciliation path. + end + + # --------------------------------------------------------------------------- + # Apply reconcile operations + # --------------------------------------------------------------------------- + + defp apply_reconcile_op({:insert, array, prelim}) do + Yex.Array.push(array, prelim) + end + + defp apply_reconcile_op({:delete, array, index}) do + Yex.Array.delete(array, index) + end + + defp apply_reconcile_op({:set_field, yex_map, key, value}) do + Yex.Map.set(yex_map, key, value) + end + + defp apply_reconcile_op({:update_text, text, new_content}) do + len = Yex.Text.length(text) + if len > 0, do: Yex.Text.delete(text, 0, len) + Yex.Text.insert(text, 0, new_content) + end + + # --------------------------------------------------------------------------- + # Existing changeset-driven reconciler (unchanged below) + # --------------------------------------------------------------------------- + defp generate_ydoc_operations(%Ecto.Changeset{} = changeset, workflow, doc) do [ :jobs, diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex index 9d10373154b..54473d91c86 100644 --- a/lib/lightning/projects/provisioner.ex +++ b/lib/lightning/projects/provisioner.ex @@ -25,6 +25,7 @@ defmodule Lightning.Projects.Provisioner do alias Lightning.VersionControl.ProjectRepoConnection alias Lightning.VersionControl.VersionControlUsageLimiter + alias Lightning.Collaboration.WorkflowReconciler alias Lightning.Workflows alias Lightning.Workflows.Audit alias Lightning.Workflows.Edge @@ -72,43 +73,63 @@ defmodule Lightning.Projects.Provisioner do def import_document(project, user_or_repo_connection, data, opts) do allow_stale = Keyword.get(opts, :allow_stale, false) - Repo.transact(fn -> - with :ok <- maybe_limit_provisioning(project.id, user_or_repo_connection), - project_changeset <- - build_import_changeset(project, user_or_repo_connection, data), - edges_to_cleanup <- - edges_referencing_deleted_jobs(project_changeset), - {:ok, %{workflows: workflows} = project} <- - Repo.insert_or_update(project_changeset, allow_stale: allow_stale), - :ok <- cleanup_orphaned_edges(edges_to_cleanup), - :ok <- handle_collection_deletion(project_changeset), - updated_project <- preload_dependencies(project), - {:ok, _changes} <- - audit_workflows(project_changeset, user_or_repo_connection), - {:ok, _changes} <- - update_workflows_version( - project_changeset, - updated_project.workflows - ), - {:ok, _changes} <- - create_snapshots( - project_changeset, - updated_project.workflows, - user_or_repo_connection - ) do - Enum.each(workflows, &Workflows.Events.workflow_updated/1) - - project_changeset - |> get_assoc(:workflows) - |> Enum.each(&Workflows.publish_kafka_trigger_events/1) - - Lightning.Projects.SandboxPromExPlugin.fire_provisioner_import_event( - Lightning.Projects.Project.sandbox?(updated_project) + with {:ok, updated_project} <- + Repo.transact(fn -> + with :ok <- + maybe_limit_provisioning(project.id, user_or_repo_connection), + project_changeset <- + build_import_changeset( + project, + user_or_repo_connection, + data + ), + edges_to_cleanup <- + edges_referencing_deleted_jobs(project_changeset), + {:ok, %{workflows: workflows} = project} <- + Repo.insert_or_update(project_changeset, + allow_stale: allow_stale + ), + :ok <- cleanup_orphaned_edges(edges_to_cleanup), + :ok <- handle_collection_deletion(project_changeset), + updated_project <- preload_dependencies(project), + {:ok, _changes} <- + audit_workflows(project_changeset, user_or_repo_connection), + {:ok, _changes} <- + update_workflows_version( + project_changeset, + updated_project.workflows + ), + {:ok, _changes} <- + create_snapshots( + project_changeset, + updated_project.workflows, + user_or_repo_connection + ) do + Enum.each(workflows, &Workflows.Events.workflow_updated/1) + + project_changeset + |> get_assoc(:workflows) + |> Enum.each(&Workflows.publish_kafka_trigger_events/1) + + Lightning.Projects.SandboxPromExPlugin.fire_provisioner_import_event( + Lightning.Projects.Project.sandbox?(updated_project) + ) + + {:ok, updated_project} + end + end) do + # Reconcile SharedDocs OUTSIDE the transaction — the reconciler + # mutates in-memory CRDT processes, not the DB. + Enum.each( + updated_project.workflows, + &WorkflowReconciler.reconcile_workflow_from_db( + &1, + user_or_repo_connection ) + ) - {:ok, updated_project} - end - end) + {:ok, updated_project} + end end defp build_import_changeset(project, user_or_repo_connection, data) do diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index fe2283f8141..915b649f362 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -674,6 +674,16 @@ defmodule LightningWeb.WorkflowChannel do {:reply, {:ok, %{}}, socket} end + @impl true + def handle_info({:workflow_updated_externally, workflow}, socket) do + push(socket, "workflow_saved", %{ + latest_snapshot_lock_version: workflow.lock_version, + workflow: workflow + }) + + {:noreply, socket} + end + @impl true def handle_info({:yjs, chunk}, socket) do push(socket, "yjs", {:binary, chunk}) diff --git a/test/lightning/cli_test.exs b/test/lightning/cli_test.exs index 0e2b7367f9b..7e3aba1c436 100644 --- a/test/lightning/cli_test.exs +++ b/test/lightning/cli_test.exs @@ -3,6 +3,11 @@ defmodule Lightning.CLITest do alias Lightning.CLI + setup do + FakeRambo.Setup.start_cache!() + :ok + end + test "any command" do CLI.execute("foo") diff --git a/test/lightning/collaborate_test.exs b/test/lightning/collaborate_test.exs index 4811600f7ae..5809d33f306 100644 --- a/test/lightning/collaborate_test.exs +++ b/test/lightning/collaborate_test.exs @@ -1,21 +1,16 @@ defmodule Lightning.CollaborateTest do - use Lightning.DataCase, async: false + use Lightning.CollaborationCase alias Lightning.Collaborate alias Lightning.Collaboration.Registry import Lightning.Factories import Eventually - import Lightning.CollaborationHelpers setup do user = insert(:user) workflow = insert(:workflow) - on_exit(fn -> - ensure_doc_supervisor_stopped(workflow.id) - end) - {:ok, user: user, workflow: workflow} end diff --git a/test/lightning/collaboration/document_supervisor_test.exs b/test/lightning/collaboration/document_supervisor_test.exs index 1b5e881a955..5c3d2383226 100644 --- a/test/lightning/collaboration/document_supervisor_test.exs +++ b/test/lightning/collaboration/document_supervisor_test.exs @@ -1,10 +1,11 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do - use Lightning.DataCase, async: false + use Lightning.CollaborationCase alias Lightning.Collaboration.DocumentState alias Lightning.Collaboration.DocumentSupervisor alias Lightning.Collaboration.PersistenceWriter alias Lightning.Collaboration.Registry + alias Lightning.Collaboration.Topology import Eventually import Lightning.Factories @@ -23,16 +24,22 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Setup for tests that need a running DocumentSupervisor defp setup_document_supervisor(context) do + base = Map.fetch!(context, :collaboration_base) + {:ok, doc_supervisor} = DocumentSupervisor.start_link( - [workflow: context.workflow, document_name: context.document_name], - name: Registry.via({:doc_supervisor, context.document_name}) + [ + workflow: context.workflow, + document_name: context.document_name, + base: base + ], + name: Registry.via(base, {:doc_supervisor, context.document_name}) ) persistence_writer = - Registry.whereis({:persistence_writer, context.document_name}) + Registry.whereis(base, {:persistence_writer, context.document_name}) - shared_doc = Registry.whereis({:shared_doc, context.document_name}) + shared_doc = Registry.whereis(base, {:shared_doc, context.document_name}) assert Process.alive?(doc_supervisor) assert Process.alive?(persistence_writer) @@ -47,13 +54,15 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Setup for tests that need a test supervisor (like restart strategy test) defp setup_test_supervisor(context) do + base = Map.fetch!(context, :collaboration_base) {:ok, test_supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) child_spec = DocumentSupervisor.child_spec( workflow: context.workflow, document_name: context.document_name, - name: Registry.via({:doc_supervisor, context.document_name}) + base: base, + name: Registry.via(base, {:doc_supervisor, context.document_name}) ) Map.merge(context, %{ @@ -63,15 +72,25 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do end # Helper function to verify cleanup after process termination - defp verify_cleanup(document_name, _workflow_id) do + defp verify_cleanup(document_name, _workflow_id, base \\ nil) do + effective_base = base || Topology.base() + # Verify Registry is cleaned up eventually - refute_eventually(Registry.whereis({:doc_supervisor, document_name})) - refute_eventually(Registry.whereis({:persistence_writer, document_name})) - refute_eventually(Registry.whereis({:shared_doc, document_name})) + refute_eventually( + Registry.whereis(effective_base, {:doc_supervisor, document_name}) + ) + + refute_eventually( + Registry.whereis(effective_base, {:persistence_writer, document_name}) + ) + + refute_eventually( + Registry.whereis(effective_base, {:shared_doc, document_name}) + ) # Verify process group is cleaned up eventually refute_eventually( - :pg.get_members(:workflow_collaboration, document_name) + :pg.get_members(Topology.pg_scope(effective_base), document_name) |> Enum.any?() ) end @@ -155,7 +174,11 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do refute Process.alive?(setup_data.persistence_writer) refute Process.alive?(setup_data.shared_doc) - verify_cleanup(setup_data.document_name, setup_data.workflow_id) + verify_cleanup( + setup_data.document_name, + setup_data.workflow_id, + setup_data.collaboration_base + ) end # Helper function to verify successful DocumentSupervisor initialization @@ -164,14 +187,19 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do document_name: document_name, doc_supervisor: doc_supervisor, persistence_writer: persistence_writer, - shared_doc: shared_doc + shared_doc: shared_doc, + collaboration_base: base }) do # Verify DocumentSupervisor is registered correctly - registered_supervisor = Registry.whereis({:doc_supervisor, document_name}) + registered_supervisor = + Registry.whereis(base, {:doc_supervisor, document_name}) + assert registered_supervisor == doc_supervisor # Verify SharedDoc is in process group (now keyed by document_name, not workflow_id) - members = :pg.get_members(:workflow_collaboration, document_name) + members = + :pg.get_members(Topology.pg_scope(base), document_name) + assert shared_doc in members # Verify both processes are monitored by checking state @@ -183,13 +211,13 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do assert state.workflow.id == workflow_id # Verify all processes are grouped correctly in Registry - group = Registry.get_group(document_name) + group = Registry.get_group(base, document_name) assert Map.has_key?(group, :persistence_writer) assert Map.has_key?(group, :shared_doc) assert Map.has_key?(group, :doc_supervisor) # Count all registered processes for this workflow - assert Registry.count(document_name) == 3 + assert Registry.count(base, document_name) == 3 end # Helper function to test DocumentSupervisor startup failure @@ -292,7 +320,11 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do assert_all_processes_terminated(processes, monitor_refs) - verify_cleanup(context.document_name, context.workflow_id) + verify_cleanup( + context.document_name, + context.workflow_id, + context.collaboration_base + ) end test "3.2 - Termination with Already Dead Children", context do @@ -320,7 +352,11 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do refute Process.alive?(context.shared_doc) end) - verify_cleanup(context.document_name, context.workflow_id) + verify_cleanup( + context.document_name, + context.workflow_id, + context.collaboration_base + ) end test "3.3 - Termination Order Verification", context do @@ -354,7 +390,12 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do "PersistenceWriter: #{persistence_writer_time}ms" refute Process.alive?(context.doc_supervisor) - verify_cleanup(context.document_name, context.workflow_id) + + verify_cleanup( + context.document_name, + context.workflow_id, + context.collaboration_base + ) end end @@ -440,7 +481,10 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Verify it's alive and registered assert Process.alive?(doc_supervisor) - assert Registry.whereis({:doc_supervisor, context.document_name}) == + assert Registry.whereis( + context.collaboration_base, + {:doc_supervisor, context.document_name} + ) == doc_supervisor # Test transient behavior - normal exit should NOT restart @@ -455,7 +499,10 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Verify no restart occurred for normal termination # Wait a reasonable time to ensure supervisor doesn't restart refute_eventually( - Registry.whereis({:doc_supervisor, context.document_name}) != nil, + Registry.whereis( + context.collaboration_base, + {:doc_supervisor, context.document_name} + ) != nil, 1000 ) @@ -482,13 +529,19 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Wait for the supervisor to restart the child assert_eventually( - Registry.whereis({:doc_supervisor, context.document_name}) != nil, + Registry.whereis( + context.collaboration_base, + {:doc_supervisor, context.document_name} + ) != nil, 2000 ) # Verify the restarted process is different restarted_supervisor = - Registry.whereis({:doc_supervisor, context.document_name}) + Registry.whereis( + context.collaboration_base, + {:doc_supervisor, context.document_name} + ) assert restarted_supervisor != doc_supervisor2 assert Process.alive?(restarted_supervisor) @@ -511,39 +564,47 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do document_name: document_name, doc_supervisor: doc_supervisor, persistence_writer: persistence_writer, - shared_doc: shared_doc + shared_doc: shared_doc, + collaboration_base: base } do # Test all processes are registered correctly using pattern matching assert %{ doc_supervisor: ^doc_supervisor, persistence_writer: ^persistence_writer, shared_doc: ^shared_doc - } = Registry.get_group(document_name) + } = Registry.get_group(base, document_name) # Test count function - assert Registry.count(document_name) == 3 + assert Registry.count(base, document_name) == 3 # Clean up GenServer.stop(doc_supervisor, :normal) - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end test "5.2 - Process Group Registration", %{ workflow_id: workflow_id, document_name: document_name, doc_supervisor: doc_supervisor, - shared_doc: shared_doc + shared_doc: shared_doc, + collaboration_base: base } do # Test SharedDoc is the only member in workflow_collaboration process group assert [^shared_doc] = - :pg.get_members(:workflow_collaboration, document_name) + :pg.get_members( + Topology.pg_scope(base), + document_name + ) assert [^shared_doc] = - :pg.get_local_members(:workflow_collaboration, document_name) + :pg.get_local_members( + Topology.pg_scope(base), + document_name + ) # Clean up and verify process group is cleaned up GenServer.stop(doc_supervisor, :normal) - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end end @@ -553,7 +614,8 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do test "6.1 - Rapid Child Restarts", %{ workflow_id: workflow_id, document_name: document_name, - doc_supervisor: doc_supervisor + doc_supervisor: doc_supervisor, + collaboration_base: base } do # Monitor the DocumentSupervisor for termination doc_supervisor_ref = Process.monitor(doc_supervisor) @@ -562,9 +624,9 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Rapidly kill the same child multiple times to simulate rapid failures # DocumentSupervisor should handle this gracefully and terminate cleanly persistence_writer = - Registry.whereis({:persistence_writer, document_name}) + Registry.whereis(base, {:persistence_writer, document_name}) - shared_doc = Registry.whereis({:shared_doc, document_name}) + shared_doc = Registry.whereis(base, {:shared_doc, document_name}) # Kill persistence_writer first GenServer.stop(persistence_writer, :kill) @@ -580,13 +642,14 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do refute Process.alive?(shared_doc) end) - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end test "6.2 - Concurrent Stop Requests", %{ workflow_id: workflow_id, document_name: document_name, - doc_supervisor: doc_supervisor + doc_supervisor: doc_supervisor, + collaboration_base: base } do # Monitor the DocumentSupervisor doc_supervisor_ref = Process.monitor(doc_supervisor) @@ -633,7 +696,7 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Verify cleanup happened only once - no orphaned processes refute Process.alive?(doc_supervisor) - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end) # Verify we captured the expected concurrent termination error @@ -644,7 +707,8 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do workflow_id: workflow_id, document_name: document_name, doc_supervisor: doc_supervisor, - shared_doc: shared_doc + shared_doc: shared_doc, + collaboration_base: base } do # Monitor the DocumentSupervisor doc_supervisor_ref = Process.monitor(doc_supervisor) @@ -672,7 +736,7 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do 5000 end) - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end end @@ -699,7 +763,10 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Verify SharedDoc is in process group assert [^shared_doc] = - :pg.get_members(:workflow_collaboration, document_name) + :pg.get_members( + Lightning.Collaboration.Topology.pg_scope(), + document_name + ) # Clean up session which should clean up DocumentSupervisor # Capture potential race condition logs during process shutdown @@ -714,11 +781,17 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do test "7.2 - With Session Processes", %{ workflow_id: _workflow_id, document_name: document_name, - shared_doc: shared_doc + shared_doc: shared_doc, + collaboration_base: base } do # Both discovery methods should find the same process - found_via_registry = Registry.whereis({:shared_doc, document_name}) - [found_via_pg] = :pg.get_members(:workflow_collaboration, document_name) + found_via_registry = Registry.whereis(base, {:shared_doc, document_name}) + + [found_via_pg] = + :pg.get_members( + Topology.pg_scope(base), + document_name + ) assert found_via_registry == shared_doc assert found_via_pg == shared_doc @@ -729,7 +802,8 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do workflow_id: workflow_id, document_name: document_name, shared_doc: shared_doc, - persistence_writer: persistence_writer + persistence_writer: persistence_writer, + collaboration_base: base } do # Test that stopping SharedDoc triggers flush_and_stop on PersistenceWriter shared_doc_ref = Process.monitor(shared_doc) @@ -747,12 +821,12 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do 5000 # Cleanup is handled by DocumentSupervisor monitoring - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end end describe "8. Resource Management" do - test "8.1 - Memory Leaks", %{workflow: workflow} do + test "8.1 - Memory Leaks", %{workflow: workflow, collaboration_base: base} do workflow_id = workflow.id document_name = "workflow:#{workflow_id}" @@ -761,8 +835,8 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do # Start DocumentSupervisor {:ok, doc_supervisor} = DocumentSupervisor.start_link( - [workflow: workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) + [workflow: workflow, document_name: document_name, base: base], + name: Registry.via(base, {:doc_supervisor, document_name}) ) # Verify all processes are created and registered @@ -770,32 +844,38 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do doc_supervisor: ^doc_supervisor, persistence_writer: _persistence_writer, shared_doc: shared_doc - } = Registry.get_group(document_name) + } = Registry.get_group(base, document_name) # Verify process group membership assert [^shared_doc] = - :pg.get_members(:workflow_collaboration, document_name) + :pg.get_members( + Topology.pg_scope(base), + document_name + ) # Stop DocumentSupervisor normally GenServer.stop(doc_supervisor, :normal) # Verify complete cleanup after each cycle - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end end - test "8.2 - Timeout Handling", %{workflow: workflow} do + test "8.2 - Timeout Handling", %{ + workflow: workflow, + collaboration_base: base + } do workflow_id = workflow.id document_name = "workflow:#{workflow_id}" # Start DocumentSupervisor {:ok, doc_supervisor} = DocumentSupervisor.start_link( - [workflow: workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) + [workflow: workflow, document_name: document_name, base: base], + name: Registry.via(base, {:doc_supervisor, document_name}) ) - shared_doc = Registry.whereis({:shared_doc, document_name}) + shared_doc = Registry.whereis(base, {:shared_doc, document_name}) # Make SharedDoc unresponsive by suspending it :sys.suspend(shared_doc) @@ -811,12 +891,15 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do :shutdown}, 15_000 - verify_cleanup(document_name, workflow_id) + verify_cleanup(document_name, workflow_id, base) end end describe "9. Checkpoint Creation" do - test "creates checkpoint from persisted updates", %{workflow: workflow} do + test "creates checkpoint from persisted updates", %{ + workflow: workflow, + collaboration_base: base + } do document_name = "workflow:#{workflow.id}" # Create some initial document state with Y.Doc data @@ -857,7 +940,7 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do {:ok, persistence_writer} = PersistenceWriter.start_link( document_name: document_name, - name: Registry.via({:persistence_writer, document_name}) + name: Registry.via(base, {:persistence_writer, document_name}) ) # Trigger checkpoint creation by sending the message directly @@ -895,7 +978,8 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do end test "creates checkpoint merging existing checkpoint with updates", %{ - workflow: workflow + workflow: workflow, + collaboration_base: base } do document_name = "workflow:#{workflow.id}" @@ -941,7 +1025,7 @@ defmodule Lightning.Collaboration.DocumentSupervisorTest do {:ok, persistence_writer} = PersistenceWriter.start_link( document_name: document_name, - name: Registry.via({:persistence_writer, document_name}) + name: Registry.via(base, {:persistence_writer, document_name}) ) send(persistence_writer, :create_checkpoint) diff --git a/test/lightning/collaboration/external_workflow_update_test.exs b/test/lightning/collaboration/external_workflow_update_test.exs new file mode 100644 index 00000000000..7b2e63d56f2 --- /dev/null +++ b/test/lightning/collaboration/external_workflow_update_test.exs @@ -0,0 +1,461 @@ +defmodule Lightning.Collaboration.ExternalWorkflowUpdateTest do + @moduledoc """ + Tests that the collaborative editor stays in sync when the workflow is + updated externally (provisioner import, sandbox merge) without going through + the Y.doc. + + Covers two failure modes: + - Someone is online when the external update runs (SharedDoc is alive but + never notified — currently red, fix pending) + - Nobody is online (stale DocumentState is loaded on next open — the + provisioner calls WorkflowReconciler, which goes through Collaborate.start + to update the doc and flush the correct state before the next user opens) + """ + + # Each test gets its own isolated collaboration tree via + # `Lightning.CollaborationCase`. + use Lightning.CollaborationCase + + import Lightning.CollaborationHelpers + import Lightning.Factories + + alias Lightning.Collaborate + alias Lightning.Collaboration.DocumentSupervisor + alias Lightning.Collaboration.Registry, as: CollaborationRegistry + alias Lightning.Collaboration.Session + alias Lightning.Collaboration.Topology + alias Lightning.Projects.Provisioner + + setup do + Mox.stub( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + :ok + end + + describe "after a sandbox merge" do + test "a new tab sees the merged version, not the stale editor state with unsaved changes" do + user_a = insert(:user) + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + start_supervised!( + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + base: Topology.base(), + name: CollaborationRegistry.via({:doc_supervisor, document_name})} + ) + + # --- Tab A: User A opens the workflow and adds an unsaved job --- + {:ok, session_a} = Collaborate.start(user: user_a, workflow: workflow) + + Session.update_doc(session_a, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => Ecto.UUID.generate(), + "name" => "tab-a-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + end) + + # --- User B merges the sandbox (provisioner import, adds a new job) --- + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + # --- Tab B: User B opens the workflow to verify what they just merged --- + {:ok, session_b} = Collaborate.start(user: user_b, workflow: workflow) + + doc = Session.get_doc(session_b) + + job_ids = + doc + |> Yex.Doc.get_array("jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session_a) + Session.stop(session_b) + ensure_doc_supervisor_stopped(workflow.id) + + assert provisioner_job_id in job_ids, + "Tab B should see the job that User B just deployed (#{provisioner_job_id}), " <> + "but the SharedDoc was not reset after the sandbox merge. " <> + "Got job ids: #{inspect(job_ids)}" + end + end + + # --------------------------------------------------------------------------- + # Tab A saves after a provisioner import + # --------------------------------------------------------------------------- + # + # After the provisioner import the SharedDoc should be reset to v2. + # So when Tab A saves, the result should be v2 (provisioner's job present). + # Today it fails because the SharedDoc is never reset — Tab A saves v1 + + # unsaved changes, producing a v3 that skips v2's content entirely. + + describe "after a sandbox merge, saving from an open tab" do + test "saves the merged version, not the unsaved changes" do + user_a = insert(:user) + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + start_supervised!( + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + base: Topology.base(), + name: CollaborationRegistry.via({:doc_supervisor, document_name})} + ) + + # Tab A: User A opens the workflow and adds an unsaved job + {:ok, session_a} = Collaborate.start(user: user_a, workflow: workflow) + # This test uses Topology.base() (production topology); $callers from the + # test pid does not reach the Session process, so explicit allow is needed. + Mox.allow(LightningMock, self(), session_a) + + tab_a_unsaved_job_id = Ecto.UUID.generate() + + Session.update_doc(session_a, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => tab_a_unsaved_job_id, + "name" => "tab-a-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + end) + + # User B does a provisioner import (sandbox merge / CLI deploy) + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + # Tab A saves + {:ok, saved_workflow} = Session.save_workflow(session_a, user_a) + Session.stop(session_a) + ensure_doc_supervisor_stopped(workflow.id) + + saved_job_ids = Enum.map(saved_workflow.jobs, & &1.id) + + assert provisioner_job_id in saved_job_ids, + "Saved workflow should include the provisioner's job (#{provisioner_job_id}). " <> + "Got jobs: #{saved_workflow.jobs |> Enum.map(& &1.name) |> inspect()}" + + refute tab_a_unsaved_job_id in saved_job_ids, + "Saved workflow should not include Tab A's unsaved job — " <> + "the SharedDoc should have been reset to the merged version before the save." + end + end + + # --------------------------------------------------------------------------- + # Unsaved changes indicator base workflow not updated after sandbox merge + # --------------------------------------------------------------------------- + # + # The unsaved changes indicator compares the Y.Doc's workflow.lock_version + # against the server's latest_snapshot_lock_version to detect unsaved changes. + # After a provisioner import (sandbox merge), the Y.Doc's lock_version should + # be updated to the new DB version so the indicator uses v2 as its base. + # Today it fails because the SharedDoc is never reset — lock_version in the + # Y.Doc stays at v1, so the indicator compares against the wrong baseline. + + describe "after a sandbox merge, the unsaved changes indicator" do + test "reflects the merged version as the new base, not the stale pre-merge version" do + user_a = insert(:user) + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [_original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + start_supervised!( + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + base: Topology.base(), + name: CollaborationRegistry.via({:doc_supervisor, document_name})} + ) + + # Tab A: User A opens the workflow and adds an unsaved job + {:ok, session_a} = Collaborate.start(user: user_a, workflow: workflow) + + Session.update_doc(session_a, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => Ecto.UUID.generate(), + "name" => "tab-a-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + end) + + # User B opens sandbox, adds a job at the same node, saves, and merges sandbox. + # The provisioner import updates the DB to v2. + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + v2_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :edges, :triggers] + ) + + doc = Session.get_doc(session_a) + workflow_map = Yex.Doc.get_map(doc, "workflow") + ydoc_lock_version = Yex.Map.fetch!(workflow_map, "lock_version") + + Session.stop(session_a) + ensure_doc_supervisor_stopped(workflow.id) + + assert ydoc_lock_version == v2_workflow.lock_version, + "Y.Doc lock_version should reflect the merged version " <> + "(#{v2_workflow.lock_version}) so the unsaved changes indicator " <> + "uses v2 as its base — but got #{ydoc_lock_version}. " <> + "Tab A's indicator will compare unsaved changes against v1 " <> + "instead of the sandbox-merged v2." + end + end + + # --------------------------------------------------------------------------- + # Nobody online during provisioner import + # --------------------------------------------------------------------------- + # + # Scenario: a user had the workflow open, made unsaved changes, and closed + # their browser. The SharedDoc shut down and flushed a stale Y.Doc to + # DocumentState (v1 content + phantom unsaved job). Later, a provisioner + # import writes v2 to the DB. When the next user opens the workflow, a fresh + # SharedDoc starts and loads that stale DocumentState. + # + # The fix: the provisioner calls WorkflowReconciler after committing. The + # reconciler calls Collaborate.start (with the provisioner's actor), applies the v2 diff + # to the Y.Doc, then calls Session.stop. The shutdown chain flushes the + # correct state to DocumentState before any real user opens the workflow. + # + # We insert the stale DocumentState directly (bypassing PersistenceWriter) + # so the scenario is deterministic in tests. + + describe "when nobody is online during the provisioner import" do + test "a new tab sees the merged version, not stale persisted editor state" do + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + # Build stale Y.Doc state: v1 workflow + a phantom unsaved job left behind + # by a previous user who closed their browser without saving. + stale_doc = Yex.Doc.new() + Session.initialize_workflow_document(stale_doc, workflow) + + Yex.Doc.get_array(stale_doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => Ecto.UUID.generate(), + "name" => "previous-user-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + + stale_update = Yex.encode_state_as_update!(stale_doc) + + Lightning.Repo.insert!(%Lightning.Collaboration.DocumentState{ + document_name: document_name, + version: :update, + state_data: stale_update + }) + + # Provisioner runs while nobody is online (no active SharedDoc). + # The reconciler calls Collaborate.start (user: user_b), applies the diff, + # then Session.stop — flushing the correct state to DocumentState. + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + # Open the workflow the way a real browser tab would. + # Collaborate.start's internal sleep+retry (iter 3b) acts as a sync point: + # it only succeeds after the reconciler's SharedDoc has fully auto_exited + # and flushed — so by the time this returns, DocumentState has the correct + # content. + v2_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :edges, :triggers] + ) + + {:ok, session_b} = Collaborate.start(workflow: v2_workflow, user: user_b) + + doc = Session.get_doc(session_b) + + job_ids = + doc + |> Yex.Doc.get_array("jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session_b) + ensure_doc_supervisor_stopped(workflow.id) + + assert provisioner_job_id in job_ids, + "User B should see the provisioner's job (#{provisioner_job_id}) " <> + "when opening the workflow after the merge. " <> + "The reconciler should have corrected the stale DocumentState. " <> + "Got job ids: #{inspect(job_ids)}" + end + end + + # --------------------------------------------------------------------------- + # GitHub sync (ProjectRepoConnection actor) + # --------------------------------------------------------------------------- + # + # When a GitHub sync triggers a provisioner import, the actor is a + # ProjectRepoConnection, not a User. The reconciler must still work — no + # presence tracking should fire, but the SharedDoc must be updated correctly. + + describe "when the provisioner is triggered by a GitHub sync" do + test "a new tab sees the synced version" do + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + repo_connection = insert(:project_repo_connection, project: project) + user = insert(:user) + + [original_job] = workflow.jobs + + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + + {:ok, _} = Provisioner.import_document(project, repo_connection, v2_body) + + v2_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :edges, :triggers] + ) + + {:ok, session} = Collaborate.start(workflow: v2_workflow, user: user) + + job_ids = + Session.get_doc(session) + |> Yex.Doc.get_array("jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + assert provisioner_job_id in job_ids, + "The reconciler should have flushed the GitHub-synced job (#{provisioner_job_id}). " <> + "Got job ids: #{inspect(job_ids)}" + end + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + defp build_provisioner_body(project, workflow, opts) do + base_jobs = + Enum.map(workflow.jobs, fn job -> + %{ + "id" => job.id, + "name" => job.name, + "adaptor" => job.adaptor, + "body" => job.body + } + end) + + extra_jobs = + if opts[:add_new_job] do + [ + %{ + "id" => Ecto.UUID.generate(), + "name" => "provisioner-added-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + } + ] + else + [] + end + + %{ + "id" => project.id, + "name" => project.name, + "workflows" => [ + %{ + "id" => workflow.id, + "name" => workflow.name, + "jobs" => base_jobs ++ extra_jobs, + "triggers" => + Enum.map(workflow.triggers, fn t -> + %{"id" => t.id, "enabled" => t.enabled} + end), + "edges" => + Enum.map(workflow.edges, fn e -> + %{ + "id" => e.id, + "source_trigger_id" => e.source_trigger_id, + "source_job_id" => e.source_job_id, + "target_job_id" => e.target_job_id, + "condition_type" => to_string(e.condition_type), + "condition_expression" => e.condition_expression, + "condition_label" => e.condition_label + } + |> Map.reject(fn {_, v} -> is_nil(v) end) + end) + } + ] + } + end + + defp new_jobs_in(body, existing_job_ids) do + body + |> get_in(["workflows", Access.at(0), "jobs"]) + |> Enum.reject(&(&1["id"] in existing_job_ids)) + end +end diff --git a/test/lightning/collaboration/no_change_snapshot_test.exs b/test/lightning/collaboration/no_change_snapshot_test.exs index d651209cda3..28a03b5a795 100644 --- a/test/lightning/collaboration/no_change_snapshot_test.exs +++ b/test/lightning/collaboration/no_change_snapshot_test.exs @@ -3,7 +3,7 @@ defmodule Lightning.Collaboration.NoChangeSnapshotTest do Tests to reproduce and verify the fix for phantom snapshot creation when no actual changes are made to a workflow. """ - use Lightning.DataCase, async: false + use Lightning.CollaborationCase import Lightning.Factories @@ -12,11 +12,6 @@ defmodule Lightning.Collaboration.NoChangeSnapshotTest do describe "saving without changes" do setup do - # Set global mode for the mock to allow cross-process calls - Mox.set_mox_global(LightningMock) - # Stub the broadcast calls that save_workflow makes - Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) - user = insert(:user) project = insert(:project) workflow = insert(:workflow, name: "Test Workflow", project: project) @@ -27,12 +22,13 @@ defmodule Lightning.Collaboration.NoChangeSnapshotTest do test "does not create snapshot when saving Y.Doc with no changes", %{ user: user, - workflow: workflow + workflow: workflow, + collaboration_base: base } do # Start document and session start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) session_pid = @@ -40,9 +36,13 @@ defmodule Lightning.Collaboration.NoChangeSnapshotTest do {Session, workflow: workflow, user: user, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) + # $callers does not propagate through ExUnit's test supervisor. + Mox.allow(LightningMock, self(), session_pid) + # Get initial lock_version workflow = Workflows.get_workflow!(workflow.id) initial_lock_version = workflow.lock_version @@ -75,12 +75,13 @@ defmodule Lightning.Collaboration.NoChangeSnapshotTest do test "creates snapshot when actually changing workflow data", %{ user: user, - workflow: workflow + workflow: workflow, + collaboration_base: base } do # Start document and session start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) session_pid = @@ -88,9 +89,13 @@ defmodule Lightning.Collaboration.NoChangeSnapshotTest do {Session, workflow: workflow, user: user, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) + # $callers does not propagate through ExUnit's test supervisor. + Mox.allow(LightningMock, self(), session_pid) + # Get initial lock_version workflow = Workflows.get_workflow!(workflow.id) initial_lock_version = workflow.lock_version diff --git a/test/lightning/collaboration/persistence_test.exs b/test/lightning/collaboration/persistence_test.exs index 210a118e950..968bf10eff7 100644 --- a/test/lightning/collaboration/persistence_test.exs +++ b/test/lightning/collaboration/persistence_test.exs @@ -1,5 +1,5 @@ defmodule Lightning.Collaboration.PersistenceTest do - use Lightning.DataCase, async: false + use Lightning.CollaborationCase alias Lightning.Collaboration.DocumentState alias Lightning.Collaboration.DocumentSupervisor @@ -19,231 +19,10 @@ defmodule Lightning.Collaboration.PersistenceTest do :ok end - describe "reconcile_workflow_metadata/2" do - setup do - workflow = insert(:workflow) - workflow_id = workflow.id - document_name = "workflow:#{workflow_id}" - - {:ok, - workflow: workflow, workflow_id: workflow_id, document_name: document_name} - end - - test "converts deleted_at DateTime to string when reconciling", %{ - workflow: workflow, - document_name: document_name - } do - # Create a workflow with a deleted_at timestamp - workflow_with_deleted = %{workflow | deleted_at: DateTime.utc_now()} - - # Create persisted Y.Doc state with same lock_version - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "initial_state", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - # Old persisted state has deleted_at as nil - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor with workflow that has deleted_at - # This triggers reconcile_workflow_metadata - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: workflow_with_deleted, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify the deleted_at was properly converted to a string in Y.Doc - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - deleted_at_value = Yex.Map.fetch!(reconciled_workflow_map, "deleted_at") - - # Should be a string (ISO8601), not a DateTime struct - assert is_binary(deleted_at_value) - - # Should match the original DateTime when parsed back - assert {:ok, parsed_dt, _} = DateTime.from_iso8601(deleted_at_value) - assert DateTime.compare(parsed_dt, workflow_with_deleted.deleted_at) == :eq - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - - test "handles nil deleted_at correctly", %{ - workflow: workflow, - document_name: document_name - } do - # Workflow without deleted_at (nil) - workflow_without_deleted = %{workflow | deleted_at: nil} - - # Create persisted state - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "setup", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: workflow_without_deleted, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify deleted_at remains nil - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - deleted_at_value = Yex.Map.fetch!(reconciled_workflow_map, "deleted_at") - assert deleted_at_value == nil - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - - test "reconciles lock_version when persisted state exists", %{ - workflow: workflow, - document_name: document_name - } do - # Create persisted state with old lock_version - old_lock_version = workflow.lock_version - new_lock_version = old_lock_version + 1 - - # Update workflow in DB to have new lock_version - {:ok, updated_workflow} = - workflow - |> Ecto.Changeset.change(lock_version: new_lock_version) - |> Repo.update() - - # Create persisted Y.Doc state with old lock_version - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "setup", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", old_lock_version) - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor - should reconcile to new lock_version - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: updated_workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify lock_version was reconciled to the current DB value - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - reconciled_lock_version = - Yex.Map.fetch!(reconciled_workflow_map, "lock_version") - - assert reconciled_lock_version == new_lock_version - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - - test "reconciles workflow name when it changed", %{ - workflow: workflow, - document_name: document_name - } do - # Update workflow name in DB - {:ok, updated_workflow} = - workflow - |> Ecto.Changeset.change(name: "Updated Name") - |> Repo.update() - - # Create persisted state with old name - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "setup", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: updated_workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify name was reconciled - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - reconciled_name = Yex.Map.fetch!(reconciled_workflow_map, "name") - assert reconciled_name == "Updated Name" - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - end - describe "bind/3 with no persisted state" do - test "initializes workflow document from database" do + test "initializes workflow document from database", %{ + collaboration_base: base + } do workflow = insert(:workflow) document_name = "workflow:#{workflow.id}" @@ -252,14 +31,14 @@ defmodule Lightning.Collaboration.PersistenceTest do # Start DocumentSupervisor {:ok, doc_supervisor} = DocumentSupervisor.start_link( - [workflow: workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) + [workflow: workflow, document_name: document_name, base: base], + name: Registry.via(base, {:doc_supervisor, document_name}) ) assert Process.alive?(doc_supervisor) # Verify workflow was initialized from database - shared_doc = Registry.whereis({:shared_doc, document_name}) + shared_doc = Registry.whereis(base, {:shared_doc, document_name}) doc = Yex.Sync.SharedDoc.get_doc(shared_doc) workflow_map = Yex.Doc.get_map(doc, "workflow") @@ -275,11 +54,13 @@ defmodule Lightning.Collaboration.PersistenceTest do end describe "bind/3 with stale persisted state" do - test "resets Y.Doc when persisted lock_version differs from database" do + test "loads persisted state as-is without reconciliation", %{ + collaboration_base: base + } do workflow = insert(:workflow, lock_version: 5) document_name = "workflow:#{workflow.id}" - # Create persisted state with older lock_version + # Create persisted state with an older lock_version and different name doc = Yex.Doc.new() workflow_map = Yex.Doc.get_map(doc, "workflow") @@ -302,21 +83,19 @@ defmodule Lightning.Collaboration.PersistenceTest do # Start DocumentSupervisor {:ok, doc_supervisor} = DocumentSupervisor.start_link( - [workflow: workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) + [workflow: workflow, document_name: document_name, base: base], + name: Registry.via(base, {:doc_supervisor, document_name}) ) assert Process.alive?(doc_supervisor) - # Verify Y.Doc was reset to current database state - shared_doc = Registry.whereis({:shared_doc, document_name}) + # Persisted state is loaded as-is — no automatic reconciliation + shared_doc = Registry.whereis(base, {:shared_doc, document_name}) doc = Yex.Sync.SharedDoc.get_doc(shared_doc) workflow_map = Yex.Doc.get_map(doc, "workflow") - # Should have current lock_version, not the old one - assert Yex.Map.fetch!(workflow_map, "lock_version") == 5 - # Should have current name, not the old one - assert Yex.Map.fetch!(workflow_map, "name") == workflow.name + assert Yex.Map.fetch!(workflow_map, "lock_version") == 3.0 + assert Yex.Map.fetch!(workflow_map, "name") == "Old Name" # Clean up GenServer.stop(doc_supervisor, :normal) diff --git a/test/lightning/collaboration/session_test.exs b/test/lightning/collaboration/session_test.exs index aad64432d3d..cfdf1116c10 100644 --- a/test/lightning/collaboration/session_test.exs +++ b/test/lightning/collaboration/session_test.exs @@ -1,15 +1,12 @@ defmodule Lightning.SessionTest do - # We assume that the WorkflowCollaboration supervisor is up - # that starts :pg with the :workflow_collaboration scope - # and a dynamic supervisor called Lightning.WorkflowCollaboration - - # Tests must be async: false, some of the processes we start are either - # not owned by the test process, or themselves start processes. - use Lightning.DataCase, async: false + # Each test gets its own collaboration supervisor (Registry, :pg scope, + # DynamicSupervisor) via `Lightning.CollaborationCase`, so the GenServers + # spawned here can't outlive the test's SQL sandbox checkout. + use Lightning.CollaborationCase import Eventually - import Lightning.Factories import Lightning.CollaborationHelpers + import Lightning.Factories import Mox alias Lightning.Collaboration.DocumentState @@ -27,11 +24,10 @@ defmodule Lightning.SessionTest do {:ok, user: user} end - setup :verify_on_exit! - describe "start/1" do test "start_link/1 returns an error when the SharedDoc doesn't exist", %{ - user: user + user: user, + collaboration_base: base } do workflow_id = Ecto.UUID.generate() @@ -47,17 +43,22 @@ defmodule Lightning.SessionTest do {Session, user: user, workflow: workflow, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) end - test "start/1 can join an existing shared doc", %{user: user1} do + test "start/1 can join an existing shared doc", %{ + user: user1, + collaboration_base: base + } do user2 = insert(:user) workflow = insert(:simple_workflow) Lightning.Collaborate.start_document( workflow, - "workflow:#{workflow.id}" + "workflow:#{workflow.id}", + base ) [parent1, parent2] = build_parents(2) @@ -69,7 +70,8 @@ defmodule Lightning.SessionTest do user: user, workflow: workflow, parent_pid: parent, - document_name: "workflow:#{workflow.id}" + document_name: "workflow:#{workflow.id}", + base: base ) client @@ -92,7 +94,7 @@ defmodule Lightning.SessionTest do end) shared_doc_pid = - Registry.get_group("workflow:#{workflow.id}") + Registry.get_group(base, "workflow:#{workflow.id}") |> Map.get(:shared_doc) observer_processes = @@ -115,7 +117,12 @@ defmodule Lightning.SessionTest do document_name = "workflow:#{workflow.id}" assert_eventually( - length(:pg.get_members(:workflow_collaboration, document_name)) == 1 + length( + :pg.get_members( + Lightning.Collaboration.Topology.pg_scope(base), + document_name + ) + ) == 1 ) Process.exit(parent2, :normal) @@ -131,13 +138,21 @@ defmodule Lightning.SessionTest do # But we might want to control the cleanup ourselves, in which case # this will be > 0 until we stop the SharedDoc ourselves. assert_eventually( - length(:pg.get_members(:workflow_collaboration, workflow.id)) == 0 + length( + :pg.get_members( + Lightning.Collaboration.Topology.pg_scope(base), + workflow.id + ) + ) == 0 ) end end describe "workflow initialization" do - test "SharedDoc is initialized with workflow data", %{user: user} do + test "SharedDoc is initialized with workflow data", %{ + user: user, + collaboration_base: base + } do # Create a workflow with jobs workflow = build(:complex_workflow, name: "Test Workflow") @@ -145,7 +160,7 @@ defmodule Lightning.SessionTest do start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) # Start a session - this should initialize the SharedDoc with workflow data @@ -154,7 +169,8 @@ defmodule Lightning.SessionTest do {Session, user: user, workflow: workflow, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) # Send a message to allow :handle_continue to finish @@ -235,14 +251,17 @@ defmodule Lightning.SessionTest do end end - test "existing SharedDoc is not reinitialized", %{user: user} do + test "existing SharedDoc is not reinitialized", %{ + user: user, + collaboration_base: base + } do workflow = insert(:workflow, name: "Test Workflow") insert(:job, workflow: workflow, name: "Original Job", body: "original") start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) # Start first session @@ -251,7 +270,8 @@ defmodule Lightning.SessionTest do {Session, workflow: workflow, user: user, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) shared_doc_1 = Session.get_doc(session_1) @@ -267,7 +287,8 @@ defmodule Lightning.SessionTest do {Session, workflow: workflow, user: user, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) shared_doc_2 = Session.get_doc(session_2) @@ -287,7 +308,10 @@ defmodule Lightning.SessionTest do refute_eventually(Process.alive?(shared_doc_pid)) end - test "client can sync workflow data from SharedDoc", %{user: user} do + test "client can sync workflow data from SharedDoc", %{ + user: user, + collaboration_base: base + } do # Create workflow with jobs workflow = insert(:workflow, name: "Sync Test Workflow") @@ -300,7 +324,7 @@ defmodule Lightning.SessionTest do start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) # Start session to initialize SharedDoc @@ -309,7 +333,8 @@ defmodule Lightning.SessionTest do {Session, user: user, workflow: workflow, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) %Session{shared_doc_pid: shared_doc_pid} = :sys.get_state(session_pid) @@ -348,13 +373,18 @@ defmodule Lightning.SessionTest do describe "persistence" do # @tag :pick - test "saves document state to the database", %{user: user} do + test "saves document state to the database", %{ + user: user, + collaboration_base: base + } do workflow = insert(:simple_workflow) _document_supervisor = start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, + document_name: "workflow:#{workflow.id}", + base: base} ) session_pid = @@ -362,7 +392,8 @@ defmodule Lightning.SessionTest do {Session, user: user, workflow: workflow, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) # This is an existing workflow, so when the session starts, it should @@ -389,35 +420,39 @@ defmodule Lightning.SessionTest do # Lets find the PersistenceWriter and check it's state - persistence_writer = get_persistence_writer(document_name) + persistence_writer = get_persistence_writer(document_name, base) # There should be 1 pending update - assert get_pending_updates(document_name) |> length() == 1 + assert get_pending_updates(document_name, base) |> length() == 1 assert get_document_state(document_name) |> length() == 0, "Nothing is expected in the database yet" # Now lets add a job add_job(session_pid) - assert get_pending_updates(document_name) |> length() == 2 + assert get_pending_updates(document_name, base) |> length() == 2 # And another job = string_params_for(:job) add_job(session_pid, job) - assert get_pending_updates(document_name) |> length() == 3 + assert get_pending_updates(document_name, base) |> length() == 3 # And force saving the updates (this normally happens on a timer) send(persistence_writer, :force_save) - assert_eventually(get_pending_updates(document_name) |> length() == 0) + assert_eventually( + get_pending_updates(document_name, base) |> length() == 0 + ) # And remove a job remove_job(session_pid, job) - assert get_pending_updates(document_name) |> length() == 1 + assert get_pending_updates(document_name, base) |> length() == 1 send(persistence_writer, :force_save) - assert_eventually(get_pending_updates(document_name) |> length() == 0) + assert_eventually( + get_pending_updates(document_name, base) |> length() == 0 + ) # And check that the document state is in the database assert get_document_state(document_name) |> length() == 2 @@ -426,13 +461,13 @@ defmodule Lightning.SessionTest do # TODO: Recover from state with a checkpoint end - defp get_persistence_writer(document_name) do - Registry.get_group(document_name) + defp get_persistence_writer(document_name, base) do + Registry.get_group(base, document_name) |> Map.get(:persistence_writer) end - defp get_pending_updates(document_name) do - persistence_writer = get_persistence_writer(document_name) + defp get_pending_updates(document_name, base) do + persistence_writer = get_persistence_writer(document_name, base) :sys.get_state(persistence_writer).pending_updates end @@ -469,24 +504,27 @@ defmodule Lightning.SessionTest do # from persistence. @tag :pick - test "client doc is still around", %{user: user} do + test "client doc is still around", %{user: user, collaboration_base: base} do workflow = insert(:simple_workflow) document_supervisor = start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, + document_name: "workflow:#{workflow.id}", + base: base} ) %{shared_doc: shared_doc, persistence_writer: persistence_writer} = - Registry.get_group("workflow:#{workflow.id}") + Registry.get_group(base, "workflow:#{workflow.id}") session_pid = start_supervised!( {Session, user: user, workflow: workflow, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) {:ok, client_pid} = @@ -528,7 +566,7 @@ defmodule Lightning.SessionTest do # pick up the existing document from the database. start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) # Starting a new session @@ -537,10 +575,12 @@ defmodule Lightning.SessionTest do {Session, user: user, workflow: workflow, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) - shared_doc_pid = Registry.get_group("workflow:#{workflow.id}").shared_doc + shared_doc_pid = + Registry.get_group(base, "workflow:#{workflow.id}").shared_doc GenServer.call(client_pid, {:observe, shared_doc_pid}) @@ -587,7 +627,7 @@ defmodule Lightning.SessionTest do describe "teardown" do @tag :capture_log - test "when a session is stopped", %{user: user1} do + test "when a session is stopped", %{user: user1, collaboration_base: base} do workflow_id = Ecto.UUID.generate() workflow = %Lightning.Workflows.Workflow{ @@ -602,7 +642,7 @@ defmodule Lightning.SessionTest do start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) [{client1, parent1}, {client2, parent2}, {client3, _parent3}] = @@ -615,7 +655,8 @@ defmodule Lightning.SessionTest do user: user, workflow: workflow, parent_pid: parent, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) {client, parent} @@ -725,12 +766,7 @@ defmodule Lightning.SessionTest do end describe "save_workflow/2" do - setup do - # Set global mode for the mock to allow cross-process calls - Mox.set_mox_global(LightningMock) - # Stub the broadcast calls that save_workflow makes - Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) - + setup %{collaboration_base: base} do user = insert(:user) project = insert(:project) workflow = insert(:workflow, name: "Original Name", project: project) @@ -740,7 +776,7 @@ defmodule Lightning.SessionTest do start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) session_pid = @@ -748,9 +784,14 @@ defmodule Lightning.SessionTest do {Session, workflow: workflow, user: user, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) + # $callers does not propagate through ExUnit's test supervisor; + # explicit allow is required for Session to call LightningMock. + Mox.allow(LightningMock, self(), session_pid) + %{ session: session_pid, user: user, @@ -933,7 +974,8 @@ defmodule Lightning.SessionTest do test "saves workflow with :built state and lock_version > 0", %{ user: user, - project: project + project: project, + collaboration_base: base } do # Create a new workflow struct (not yet saved) workflow_id = Ecto.UUID.generate() @@ -951,7 +993,9 @@ defmodule Lightning.SessionTest do # Start document and session with the new workflow start_supervised!( {DocumentSupervisor, - workflow: new_workflow, document_name: "workflow:#{workflow_id}"} + workflow: new_workflow, + document_name: "workflow:#{workflow_id}", + base: base} ) session_pid = @@ -959,9 +1003,12 @@ defmodule Lightning.SessionTest do {Session, workflow: new_workflow, user: user, - document_name: "workflow:#{workflow_id}"} + document_name: "workflow:#{workflow_id}", + base: base} ) + Mox.allow(LightningMock, self(), session_pid) + # First save - this creates the workflow in DB (lock_version becomes 1) assert {:ok, saved_workflow} = Session.save_workflow(session_pid, user) assert saved_workflow.lock_version == 1 @@ -988,7 +1035,8 @@ defmodule Lightning.SessionTest do test "handles workflow deleted for :built workflow with lock_version > 0", %{ user: user, - project: project + project: project, + collaboration_base: base } do # Create a workflow and save it once to get lock_version > 0 workflow_id = Ecto.UUID.generate() @@ -1003,9 +1051,12 @@ defmodule Lightning.SessionTest do triggers: [] } + # Start document and session with the new workflow start_supervised!( {DocumentSupervisor, - workflow: new_workflow, document_name: "workflow:#{workflow_id}"} + workflow: new_workflow, + document_name: "workflow:#{workflow_id}", + base: base} ) session_pid = @@ -1013,9 +1064,12 @@ defmodule Lightning.SessionTest do {Session, workflow: new_workflow, user: user, - document_name: "workflow:#{workflow_id}"} + document_name: "workflow:#{workflow_id}", + base: base} ) + Mox.allow(LightningMock, self(), session_pid) + # First save to create in DB assert {:ok, _saved_workflow} = Session.save_workflow(session_pid, user) @@ -1039,10 +1093,7 @@ defmodule Lightning.SessionTest do # we need workflows with :built state (not yet persisted to DB). # The main save_workflow/2 tests use insert() which creates :loaded workflows. describe "save_workflow/2 with NEW workflows" do - setup do - Mox.set_mox_global(LightningMock) - Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) - + setup %{collaboration_base: base} do user = insert(:user) project = insert(:project) @@ -1060,14 +1111,24 @@ defmodule Lightning.SessionTest do document_name = "workflow:new:#{workflow_id}" start_supervised!( - {DocumentSupervisor, workflow: workflow, document_name: document_name} + {DocumentSupervisor, + workflow: workflow, document_name: document_name, base: base} ) session_pid = start_supervised!( - {Session, workflow: workflow, user: user, document_name: document_name} + {Session, + workflow: workflow, + user: user, + document_name: document_name, + base: base} ) + # Mox private mode does not guarantee stub visibility in processes + # started under ExUnit's test supervisor; explicit allow is required. + Mox.allow(LightningMock, self(), session_pid) + Mox.allow(Lightning.Extensions.MockUsageLimiter, self(), session_pid) + %{ session: session_pid, user: user, @@ -1172,19 +1233,14 @@ defmodule Lightning.SessionTest do end describe "save_workflow/2 validation errors" do - setup do - # Set global mode for the mock to allow cross-process calls - Mox.set_mox_global(LightningMock) - # Stub the broadcast calls that save_workflow makes - Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) - + setup %{collaboration_base: base} do user = insert(:user) project = insert(:project) workflow = insert(:workflow, name: "Original Name", project: project) start_supervised!( {DocumentSupervisor, - workflow: workflow, document_name: "workflow:#{workflow.id}"} + workflow: workflow, document_name: "workflow:#{workflow.id}", base: base} ) session_pid = @@ -1192,9 +1248,15 @@ defmodule Lightning.SessionTest do {Session, workflow: workflow, user: user, - document_name: "workflow:#{workflow.id}"} + document_name: "workflow:#{workflow.id}", + base: base} ) + # Mox private mode does not guarantee stub visibility in processes + # started under ExUnit's test supervisor; explicit allow is required. + Mox.allow(LightningMock, self(), session_pid) + Mox.allow(Lightning.Extensions.MockUsageLimiter, self(), session_pid) + %{ session: session_pid, user: user, @@ -1663,173 +1725,79 @@ defmodule Lightning.SessionTest do end describe "persistence reconciliation" do - test "reconciles lock_version when loading persisted Y.Doc state", %{ - user: user + test "restores persisted Y.Doc content on doc supervisor restart", %{ + user: user, + collaboration_base: base } do workflow = insert(:simple_workflow) + document_name = "workflow:#{workflow.id}" - # Start initial session and make some changes + # Start a session and add an unsaved job to the Y.Doc {:ok, _doc_supervisor} = - Lightning.Collaborate.start_document( - workflow, - "workflow:#{workflow.id}" - ) + Lightning.Collaborate.start_document(workflow, document_name, base) {:ok, session1} = Session.start_link( user: user, workflow: workflow, parent_pid: self(), - document_name: "workflow:#{workflow.id}" + document_name: document_name, + base: base ) - # Get the SharedDoc and verify initial lock_version - shared_doc = Session.get_doc(session1) - workflow_map = Yex.Doc.get_map(shared_doc, "workflow") - initial_lock_version = Yex.Map.fetch!(workflow_map, "lock_version") - - # Simulate workflow changes in database (e.g., from another save) - # This increments lock_version in the database - new_lock_version = initial_lock_version + 1 + custom_job_id = Ecto.UUID.generate() - {:ok, updated_workflow} = - Lightning.Workflows.save_workflow( - Lightning.Workflows.change_workflow(workflow, %{ - name: "Updated Name" - }), - user + Session.update_doc(session1, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => custom_job_id, + "name" => "persisted-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) ) + end) - # Verify database has new lock_version - assert updated_workflow.lock_version == new_lock_version - - # Stop the session and document supervisor to simulate server restart + # Stop the session and doc supervisor — PersistenceWriter flushes state Session.stop(session1) - ensure_doc_supervisor_stopped(workflow.id) + ensure_doc_supervisor_stopped(workflow.id, base) - # Start a new session - this will load persisted Y.Doc state - # The persisted state has old lock_version, but fresh workflow has new one + # Restart — persisted state should be restored including the custom job {:ok, _doc_supervisor2} = - Lightning.Collaborate.start_document( - updated_workflow, - "workflow:#{workflow.id}" - ) + Lightning.Collaborate.start_document(workflow, document_name, base) {:ok, session2} = - Session.start_link( - user: user, - workflow: updated_workflow, - parent_pid: self(), - document_name: "workflow:#{workflow.id}" - ) - - # Get the SharedDoc and check lock_version was reconciled - shared_doc2 = Session.get_doc(session2) - workflow_map2 = Yex.Doc.get_map(shared_doc2, "workflow") - reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") - - # The lock_version should match the database, not the stale persisted state - assert reconciled_lock_version == new_lock_version, - "Expected lock_version #{new_lock_version} but got #{reconciled_lock_version}" - - # Verify name was also reconciled - reconciled_name = Yex.Map.fetch!(workflow_map2, "name") - assert reconciled_name == "Updated Name" - - Session.stop(session2) - end - - test "discards stale persisted Y.Doc when lock_version changes", %{ - user: user - } do - workflow = insert(:simple_workflow) - - # Start initial session with lock_version 0 - {:ok, _doc_supervisor} = - Lightning.Collaborate.start_document( - workflow, - "workflow:#{workflow.id}" - ) - - {:ok, session1} = Session.start_link( user: user, workflow: workflow, parent_pid: self(), - document_name: "workflow:#{workflow.id}" - ) - - # Get initial state - shared_doc = Session.get_doc(session1) - jobs_array = Yex.Doc.get_array(shared_doc, "jobs") - initial_job_count = Yex.Array.length(jobs_array) - - # Workflow is saved (lock_version increments) - {:ok, updated_workflow} = - Lightning.Workflows.save_workflow( - Lightning.Workflows.change_workflow(workflow, %{ - name: "Changed by another user" - }), - user + document_name: document_name, + base: base ) - new_lock_version = updated_workflow.lock_version + doc = Session.get_doc(session2) - # Simulate server restart - persisted Y.Doc has old lock_version - Session.stop(session1) - ensure_doc_supervisor_stopped(workflow.id) - - # Start new session with updated workflow - # Persistence should detect stale lock_version and reload from DB - {:ok, _doc_supervisor2} = - Lightning.Collaborate.start_document( - updated_workflow, - "workflow:#{workflow.id}" - ) - - {:ok, session2} = - Session.start_link( - user: user, - workflow: updated_workflow, - parent_pid: self(), - document_name: "workflow:#{workflow.id}" - ) - - # Verify Y.Doc was reloaded from database - shared_doc2 = Session.get_doc(session2) - jobs_array2 = Yex.Doc.get_array(shared_doc2, "jobs") - - # Should have original jobs from DB (persisted Y.Doc was discarded) - assert Yex.Array.length(jobs_array2) == initial_job_count - - # Verify lock_version matches database - workflow_map2 = Yex.Doc.get_map(shared_doc2, "workflow") - reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") - - assert reconciled_lock_version == new_lock_version, - "Lock version should match database" + job_ids = + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) - # Verify workflow name was updated from database - reconciled_name = Yex.Map.fetch!(workflow_map2, "name") - assert reconciled_name == "Changed by another user" + assert custom_job_id in job_ids, + "Persisted custom job should be restored on restart" Session.stop(session2) end - test "handles persisted Y.Doc with nil lock_version when DB has real version", - %{ - user: user - } do - # This tests the bug fix for issue #4164 - # When a workflow is opened before first save, Y.Doc gets lock_version: nil - # If that state is persisted and the workflow is later saved (getting a real lock_version), - # loading the persisted state would crash because extract_lock_version didn't handle {:ok, nil} - + test "handles persisted Y.Doc with nil lock_version without crashing", %{ + user: user, + collaboration_base: base + } do workflow = insert(:simple_workflow) doc_name = "workflow:#{workflow.id}" - # Manually create a persisted document state with lock_version: nil - # This simulates a Y.Doc that was persisted before the workflow was ever saved + # Manually create persisted state with nil lock_version — + # simulates a Y.Doc persisted before the workflow was first saved doc = Yex.Doc.new() workflow_map = Yex.Doc.get_map(doc, "workflow") @@ -1847,36 +1815,32 @@ defmodule Lightning.SessionTest do version: :update }) - # Now start a session - this should NOT crash - # The persistence layer should handle the nil lock_version and reset from DB + # Should not crash when loading persisted state with nil lock_version {:ok, _doc_supervisor} = - Lightning.Collaborate.start_document( - workflow, - doc_name - ) + Lightning.Collaborate.start_document(workflow, doc_name, base) {:ok, session} = Session.start_link( user: user, workflow: workflow, parent_pid: self(), - document_name: doc_name + document_name: doc_name, + base: base ) - # Verify the session started and lock_version was reconciled from DB - shared_doc = Session.get_doc(session) - workflow_map2 = Yex.Doc.get_map(shared_doc, "workflow") - reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") + # Persisted state is loaded as-is — nil lock_version stays nil + doc = Session.get_doc(session) + workflow_map2 = Yex.Doc.get_map(doc, "workflow") + lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") - # lock_version should now match the database value - assert reconciled_lock_version == workflow.lock_version, - "Expected lock_version #{workflow.lock_version} but got #{reconciled_lock_version}" + assert is_nil(lock_version) Session.stop(session) end test "merges delta updates with persisted state across save batches", %{ - user: user + user: user, + collaboration_base: base } do # This tests the fix for the merge_updates bug where delta updates # saved in subsequent batches were applied to an empty doc instead of @@ -1943,7 +1907,8 @@ defmodule Lightning.SessionTest do {:ok, _doc_supervisor} = Lightning.Collaborate.start_document( workflow, - doc_name + doc_name, + base ) {:ok, session} = @@ -1951,7 +1916,8 @@ defmodule Lightning.SessionTest do user: user, workflow: workflow, parent_pid: self(), - document_name: doc_name + document_name: doc_name, + base: base ) # Verify the session loaded the full state correctly diff --git a/test/lightning/collaboration/workflow_reconciler_test.exs b/test/lightning/collaboration/workflow_reconciler_test.exs index ffdee70fce4..8f48f8a9567 100644 --- a/test/lightning/collaboration/workflow_reconciler_test.exs +++ b/test/lightning/collaboration/workflow_reconciler_test.exs @@ -1,11 +1,10 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do - # Tests must be async: false because we put a SharedDoc in a dynamic supervisor - # that isn't owned by the test process. So we need our Ecto sandbox to be - # in shared mode. - use Lightning.DataCase, async: false + # Each test gets its own collaboration supervisor (Registry, :pg scope, + # DynamicSupervisor) via `Lightning.CollaborationCase`. + use Lightning.CollaborationCase - import Lightning.Factories import Lightning.CollaborationHelpers + import Lightning.Factories alias Lightning.Collaborate alias Lightning.Collaboration.{Session, WorkflowReconciler} @@ -15,34 +14,25 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do # that starts :pg with the :workflow_collaboration scope # and a dynamic supervisor called Lightning.WorkflowCollaboration - setup do - # Set global mode for the mock to allow cross-process calls - Mox.set_mox_global(LightningMock) - # Stub the broadcast calls that WorkflowReconciler makes - Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) - + setup %{collaboration_base: base} do user = insert(:user) - {:ok, user: user} + {:ok, user: user, base: base} end describe "reconcile_workflow_changes/2" do setup do workflow = insert(:complex_workflow) - - on_exit(fn -> - ensure_doc_supervisor_stopped(workflow.id) - end) - %{workflow: workflow} end test "job insert operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Create a new job changeset new_job = @@ -72,7 +62,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do Yex.Doc.monitor_update(shared_doc) # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) assert_one_update(shared_doc) @@ -91,16 +85,17 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } = new_job_data Session.stop(session_pid) - ensure_doc_supervisor_stopped(workflow.id) + ensure_doc_supervisor_stopped(workflow.id, base) end test "job update operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get the SharedDoc and verify initial state shared_doc = Session.get_doc(session_pid) @@ -139,7 +134,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) assert_one_update(shared_doc) @@ -158,16 +157,17 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do updated_job_data Session.stop(session_pid) - ensure_doc_supervisor_stopped(workflow.id) + ensure_doc_supervisor_stopped(workflow.id, base) end test "job delete operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get the first job to delete job_to_delete = Enum.at(workflow.jobs, 0) @@ -190,7 +190,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the job was removed from the YDoc shared_doc = Session.get_doc(session_pid) @@ -203,16 +207,17 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do refute find_in_ydoc_array(jobs_array, job_to_delete.id) Session.stop(session_pid) - ensure_doc_supervisor_stopped(workflow.id) + ensure_doc_supervisor_stopped(workflow.id, base) end test "edge insert operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Create a new edge between existing jobs %{id: source_job_id} = source_job = Enum.at(workflow.jobs, 1) @@ -244,7 +249,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the edge was added to the YDoc shared_doc = Session.get_doc(session_pid) @@ -266,11 +275,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "edge update operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get the first edge to update edge_to_update = Enum.at(workflow.edges, 0) @@ -295,7 +305,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the edge was updated in the YDoc shared_doc = Session.get_doc(session_pid) @@ -308,11 +322,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "edge delete operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get the first edge to delete edge_to_delete = Enum.at(workflow.edges, 0) @@ -337,7 +352,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the edge was removed from the YDoc shared_doc = Session.get_doc(session_pid) @@ -353,11 +372,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "trigger update operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get the first trigger to update trigger_to_update = Enum.at(workflow.triggers, 0) @@ -383,7 +403,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the trigger was updated in the YDoc shared_doc = Session.get_doc(session_pid) @@ -401,11 +425,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "trigger delete operations are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get the trigger to delete trigger_to_delete = Enum.at(workflow.triggers, 0) @@ -428,7 +453,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the trigger was removed from the YDoc shared_doc = Session.get_doc(session_pid) @@ -440,11 +469,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "workflow-level updates are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Create changeset for updating workflow properties workflow_changeset = @@ -456,7 +486,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do |> Map.put(:action, :update) # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify the workflow was updated in the YDoc shared_doc = Session.get_doc(session_pid) @@ -468,11 +502,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "lock_version updates are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get initial lock_version from YDoc shared_doc = Session.get_doc(session_pid) @@ -494,7 +529,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do Yex.Doc.monitor_update(shared_doc) # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify exactly one update was sent assert_one_update(shared_doc) @@ -506,16 +545,17 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do new_lock_version Session.stop(session_pid) - ensure_doc_supervisor_stopped(workflow.id) + ensure_doc_supervisor_stopped(workflow.id, base) end test "positions updates are applied to YDoc", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get some job IDs to create positions for job1_id = Enum.at(workflow.jobs, 0).id @@ -539,7 +579,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do Yex.Doc.monitor_update(shared_doc) # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) assert_one_update(shared_doc) @@ -553,11 +597,12 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "multiple simultaneous changes are applied correctly", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Get existing entities to modify job_to_update = Enum.at(workflow.jobs, 0) @@ -612,7 +657,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile all changes at once - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # Verify all changes were applied shared_doc = Session.get_doc(session_pid) @@ -646,7 +695,8 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do end test "reconciliation with no active sessions does not crash", %{ - workflow: workflow + workflow: workflow, + base: base } do # Don't start any session - no active SharedDoc @@ -660,17 +710,19 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do assert :ok = WorkflowReconciler.reconcile_workflow_changes( workflow_changeset, - workflow + workflow, + base ) end test "reconciliation handles large workflow modifications efficiently", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start a session to create the SharedDoc {:ok, session_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Create many new jobs at once (stress test) new_jobs = @@ -702,7 +754,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do start_time = System.monotonic_time(:millisecond) # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) end_time = System.monotonic_time(:millisecond) duration = end_time - start_time @@ -720,17 +776,18 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do test "reconciliation handles concurrent sessions correctly", %{ user: user, - workflow: workflow + workflow: workflow, + base: base } do # Start multiple sessions to create multiple references to SharedDoc {:ok, session1_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) {:ok, session2_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) {:ok, session3_pid} = - Collaborate.start(workflow: workflow, user: user) + Collaborate.start(workflow: workflow, user: user, base: base) # Verify all sessions share the same SharedDoc shared_doc1 = Session.get_doc(session1_pid) @@ -762,7 +819,11 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do } # Reconcile the changes - WorkflowReconciler.reconcile_workflow_changes(workflow_changeset, workflow) + WorkflowReconciler.reconcile_workflow_changes( + workflow_changeset, + workflow, + base + ) # All sessions should see the same updated data jobs_array1 = Yex.Doc.get_array(shared_doc1, "jobs") @@ -780,6 +841,300 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do end end + describe "reconcile_workflow_from_db/2" do + setup %{user: _user, collaboration_base: base} do + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + %{workflow: workflow, base: base} + end + + test "phantom job (in Y.Doc but not in DB) is removed from the doc", + %{user: user, workflow: workflow, base: base} do + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + phantom_id = Ecto.UUID.generate() + + Session.update_doc(session, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => phantom_id, + "name" => "phantom-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "" + }) + ) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user, base) + + doc = Session.get_doc(session) + jobs = Yex.Doc.get_array(doc, "jobs") |> Yex.Array.to_json() + job_ids = Enum.map(jobs, & &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + [original_job] = workflow.jobs + assert original_job.id in job_ids + refute phantom_id in job_ids + end + + test "multiple phantom jobs are all removed from the doc", + %{user: user, workflow: workflow, base: base} do + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + phantom_ids = Enum.map(1..3, fn _ -> Ecto.UUID.generate() end) + + Session.update_doc(session, fn doc -> + array = Yex.Doc.get_array(doc, "jobs") + + Enum.each(phantom_ids, fn pid -> + Yex.Array.push( + array, + Yex.MapPrelim.from(%{ + "id" => pid, + "name" => "p", + "adaptor" => "", + "body" => "" + }) + ) + end) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user, base) + + doc = Session.get_doc(session) + jobs = Yex.Doc.get_array(doc, "jobs") |> Yex.Array.to_json() + job_ids = Enum.map(jobs, & &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + assert length(job_ids) == 1 + [original_job] = workflow.jobs + assert original_job.id in job_ids + Enum.each(phantom_ids, fn pid -> refute pid in job_ids end) + end + + test "job body is updated in the doc when it differs from DB", + %{user: user, workflow: workflow, base: base} do + [db_job] = workflow.jobs + + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + Session.update_doc(session, fn doc -> + jobs_array = Yex.Doc.get_array(doc, "jobs") + + job_map = + Enum.find(jobs_array, fn m -> Yex.Map.fetch!(m, "id") == db_job.id end) + + {:ok, body_text} = Yex.Map.fetch(job_map, "body") + len = Yex.Text.length(body_text) + if len > 0, do: Yex.Text.delete(body_text, 0, len) + Yex.Text.insert(body_text, 0, "stale editor body") + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user, base) + + doc = Session.get_doc(session) + jobs_array = Yex.Doc.get_array(doc, "jobs") + + job_map = + Enum.find(jobs_array, fn m -> Yex.Map.fetch!(m, "id") == db_job.id end) + + {:ok, body_text} = Yex.Map.fetch(job_map, "body") + body_string = Yex.Text.to_string(body_text) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + assert body_string == db_job.body + end + + test "no-op reconcile does not modify Y.Doc when doc is already in sync with DB", + %{user: user, workflow: workflow, base: base} do + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + [db_job] = workflow.jobs + doc = Session.get_doc(session) + + jobs_array = Yex.Doc.get_array(doc, "jobs") + + job_map = + Enum.find(jobs_array, fn m -> Yex.Map.fetch!(m, "id") == db_job.id end) + + name_before = Yex.Map.fetch!(job_map, "name") + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user, base) + + name_after = Yex.Map.fetch!(job_map, "name") + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + assert name_before == name_after + assert name_after == db_job.name + end + + test "phantom edge (in Y.Doc but not in DB) is removed from the doc", + %{user: user, workflow: workflow, base: base} do + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + phantom_edge_id = Ecto.UUID.generate() + + Session.update_doc(session, fn doc -> + Yex.Doc.get_array(doc, "edges") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => phantom_edge_id, + "condition_type" => "always", + "enabled" => true, + "source_trigger_id" => nil, + "source_job_id" => nil, + "target_job_id" => nil + }) + ) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user, base) + + doc = Session.get_doc(session) + + edge_ids = + Yex.Doc.get_array(doc, "edges") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + [original_edge] = workflow.edges + assert original_edge.id in edge_ids + refute phantom_edge_id in edge_ids + end + + test "phantom trigger (in Y.Doc but not in DB) is removed from the doc", + %{user: user, workflow: workflow, base: base} do + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + phantom_trigger_id = Ecto.UUID.generate() + + Session.update_doc(session, fn doc -> + Yex.Doc.get_array(doc, "triggers") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => phantom_trigger_id, + "type" => "webhook", + "enabled" => true, + "cron_expression" => nil, + "kafka_configuration" => nil + }) + ) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user, base) + + doc = Session.get_doc(session) + + trigger_ids = + Yex.Doc.get_array(doc, "triggers") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + [original_trigger] = workflow.triggers + assert original_trigger.id in trigger_ids + refute phantom_trigger_id in trigger_ids + end + + test "new kafka trigger is inserted with kafka_configuration fields", + %{user: user, workflow: workflow, base: base} do + kafka_trigger = + insert(:trigger, + type: :kafka, + workflow: workflow, + kafka_configuration: build(:triggers_kafka_configuration) + ) + + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + updated_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :triggers, :edges] + ) + + WorkflowReconciler.reconcile_workflow_from_db(updated_workflow, user, base) + + doc = Session.get_doc(session) + triggers = Yex.Doc.get_array(doc, "triggers") |> Yex.Array.to_json() + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + kafka_t = Enum.find(triggers, &(&1["id"] == kafka_trigger.id)) + + assert kafka_t != nil, "kafka trigger should be inserted into Y.Doc" + + assert %{"group_id" => _, "hosts_string" => _} = + kafka_t["kafka_configuration"] + end + + test "trigger cron_expression is updated in the doc when it differs from DB", + %{user: user, workflow: workflow, base: base} do + [original_trigger] = workflow.triggers + + {:ok, session} = + Collaborate.start(workflow: workflow, user: user, base: base) + + Session.update_doc(session, fn doc -> + triggers_array = Yex.Doc.get_array(doc, "triggers") + + t_map = + Enum.find(triggers_array, fn m -> + Yex.Map.fetch!(m, "id") == original_trigger.id + end) + + Yex.Map.set(t_map, "cron_expression", "stale_value") + end) + + updated_trigger = + Lightning.Repo.update!( + Ecto.Changeset.change(original_trigger, cron_expression: "0 * * * *") + ) + + updated_workflow = %{workflow | triggers: [updated_trigger]} + + WorkflowReconciler.reconcile_workflow_from_db(updated_workflow, user, base) + + doc = Session.get_doc(session) + triggers_array = Yex.Doc.get_array(doc, "triggers") + + t_map = + Enum.find(triggers_array, fn m -> + Yex.Map.fetch!(m, "id") == original_trigger.id + end) + + cron_expression = Yex.Map.fetch!(t_map, "cron_expression") + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id, base) + + assert cron_expression == "0 * * * *" + end + end + defp find_in_ydoc_array(array, id) do array |> Enum.reduce_while(nil, fn item, _ -> diff --git a/test/lightning/metadata_service_test.exs b/test/lightning/metadata_service_test.exs index 686315cb648..d035abfd9c4 100644 --- a/test/lightning/metadata_service_test.exs +++ b/test/lightning/metadata_service_test.exs @@ -3,6 +3,11 @@ defmodule Lightning.MetadataServiceTest do alias Lightning.MetadataService + setup do + FakeRambo.Setup.start_cache!() + :ok + end + describe "fetch/2" do test "returns the metadata when it exists" do path = diff --git a/test/lightning_web/channels/workflow_channel_test.exs b/test/lightning_web/channels/workflow_channel_test.exs index 671bcf6c7c0..2feaa1a53e2 100644 --- a/test/lightning_web/channels/workflow_channel_test.exs +++ b/test/lightning_web/channels/workflow_channel_test.exs @@ -3417,4 +3417,20 @@ defmodule LightningWeb.WorkflowChannelTest do assert_broadcast "job_code_applied", %{message_id: ^message_id} end end + + describe "handle_info :workflow_updated_externally" do + test "pushes workflow_saved to the socket when the workflow is updated externally", + %{socket: socket, workflow: workflow} do + loaded_workflow = + Lightning.Repo.get!(Lightning.Workflows.Workflow, workflow.id) + + send(socket.channel_pid, {:workflow_updated_externally, loaded_workflow}) + + assert_push "workflow_saved", %{ + latest_snapshot_lock_version: lock_version + } + + assert lock_version == loaded_workflow.lock_version + end + end end diff --git a/test/lightning_web/live/workflow_live/editor_test.exs b/test/lightning_web/live/workflow_live/editor_test.exs index 0fdcea1d47b..a434e8394c8 100644 --- a/test/lightning_web/live/workflow_live/editor_test.exs +++ b/test/lightning_web/live/workflow_live/editor_test.exs @@ -1693,6 +1693,11 @@ defmodule LightningWeb.WorkflowLive.EditorTest do end describe "Editor events" do + setup do + FakeRambo.Setup.start_cache!() + :ok + end + test "can handle request_metadata event", %{ conn: conn, project: project, diff --git a/test/support/collaboration_case.ex b/test/support/collaboration_case.ex new file mode 100644 index 00000000000..165a4e5a46a --- /dev/null +++ b/test/support/collaboration_case.ex @@ -0,0 +1,125 @@ +defmodule Lightning.CollaborationCase do + @moduledoc """ + ExUnit case template for tests that exercise the collaboration GenServers + (`SharedDoc`, `PersistenceWriter`, `DocumentSupervisor`, etc.). + + Each test gets its own `Lightning.Collaboration.Supervisor` started via + `start_isolated_collaboration/0`, so the registry, dynamic supervisor, and + `:pg` scope are isolated to that test. The topology base is threaded + explicitly through every spawned process — no `Application.put_env` global + state is written. + + `Lightning.Workflows.Events.workflow_updated/1` calls `Phoenix.PubSub` + directly rather than through the `Lightning` behaviour mock, so private-mode + Mox stubs in the test process are not needed for `broadcast` — `Mox.set_mox_global` + and the companion `Mox.stub(LightningMock, :broadcast, ...)` are not required + in any collaboration test. + + When the test exits, the collaboration supervisor is shut down (draining + `DocumentSupervisor.terminate/2`'s flush) *before* the SQL Sandbox + connection is checked back in — so any DB writes the `PersistenceWriter` + does on the way out happen inside the test's sandbox. + + ## Options + + use Lightning.CollaborationCase, async: true + + Passes `async:` through to `Lightning.DataCase`. Defaults to `false`. + No global application state is written by this case template, so + individual test modules may opt in once they have verified their own + code is race-condition-free. + """ + use ExUnit.CaseTemplate + + alias Lightning.Collaboration.Registry + alias Lightning.Collaboration.Topology + + using opts do + async = Keyword.get(opts, :async, false) + + quote do + use Lightning.DataCase, async: unquote(async) + + import Lightning.CollaborationCase + end + end + + setup _tags do + {:ok, collaboration_base: start_isolated_collaboration()} + end + + @doc """ + Starts an isolated `Lightning.Collaboration.Supervisor` for the current + test, overrides `Lightning.Collaboration.Topology` in application config to + point at it, and registers an `on_exit` hook that drains the supervisor + before the SQL Sandbox connection is checked back in. + + Returns the base atom of the test's collaboration supervisor. + + This helper is callable directly from any test (e.g. channel tests) that + needs an isolated collaboration tree without inheriting the full + `Lightning.CollaborationCase` setup. + """ + def start_isolated_collaboration do + base = + Module.concat([ + Lightning.Collaboration.Test, + "T#{System.unique_integer([:positive])}" + ]) + + {:ok, sup_pid} = + Lightning.Collaboration.Supervisor.start_link(name: base) + + # Tear the collaboration tree down via `on_exit` (rather than + # `start_supervised!`) so the shutdown happens *before* the SQL Sandbox + # connection is checked back in. ExUnit runs `on_exit` callbacks in + # LIFO order, and DataCase/ChannelCase's Sandbox stop_owner is + # registered first — registering ours here makes the collab tree + # drain first, letting `PersistenceWriter`'s terminate-time DB writes + # complete inside the test's sandbox. + ExUnit.Callbacks.on_exit(fn -> + ref = Process.monitor(sup_pid) + Process.exit(sup_pid, :shutdown) + + receive do + {:DOWN, ^ref, :process, ^sup_pid, _} -> :ok + after + 5_000 -> :ok + end + end) + + base + end + + @doc """ + Starts a `DocumentSupervisor` for the given workflow inside the test's + isolated collaboration tree. + + Returns a map with `:document_supervisor`, `:persistence_writer`, + `:shared_doc`, and `:document_name`. + """ + def start_workflow_collab!(base, workflow, opts \\ []) do + document_name = + Keyword.get(opts, :document_name, "workflow:#{workflow.id}") + + {:ok, doc_sup_pid} = + Lightning.Collaboration.Supervisor.start_child( + base, + {Lightning.Collaboration.DocumentSupervisor, + workflow: workflow, + document_name: document_name, + base: base, + name: Topology.via(base, {:doc_supervisor, document_name})} + ) + + %{shared_doc: sd, persistence_writer: pw} = + Registry.get_group(base, document_name) + + %{ + document_supervisor: doc_sup_pid, + persistence_writer: pw, + shared_doc: sd, + document_name: document_name + } + end +end diff --git a/test/support/collaboration_helpers.ex b/test/support/collaboration_helpers.ex index f1a23c837bc..8172c3e7263 100644 --- a/test/support/collaboration_helpers.ex +++ b/test/support/collaboration_helpers.ex @@ -1,30 +1,51 @@ defmodule Lightning.CollaborationHelpers do - @doc """ - Ensure the document supervisor is stopped for a given workflow id. + @moduledoc """ + Test helpers for collaboration tests. + """ - This document supervisor has two children: - - PersistenceWriter - - SharedDoc + alias Lightning.Collaboration.Topology - When the parent process is stopped, the Session process is stopped. - The SharedDoc auto exits when there are no more Session processes observing it. + @doc """ + Ensures the `DocumentSupervisor` for `workflow_id` is stopped, polling + briefly for it to clear from the registry. - When the SharedDoc process is stopped, the PersistenceWriter process does - it's own shutdown. This can take a millisecond or two, so to avoid - test errors where the PersistenceWriter tries to write to the database - after the test has finished, we wait for it's parent (DocumentSupervisor) - to be stopped. + `Topology.base/0` is readable from any process (including `on_exit` + handlers) because it reads `Application.get_env` rather than a + process-scoped Mox stub. """ - def ensure_doc_supervisor_stopped(workflow_id) do - procs = - Lightning.Collaboration.Registry.get_group("workflow:#{workflow_id}") + def ensure_doc_supervisor_stopped(workflow_id, base \\ Topology.base()) do + registry = Topology.registry(base) - if procs[:doc_supervisor] do - eventually_stop(procs.doc_supervisor) + case Process.whereis(registry) do + nil -> + :ok + + _pid -> + case lookup_doc_supervisor(registry, "workflow:#{workflow_id}") do + nil -> + :ok + + pid -> + Eventually.eventually( + fn -> Process.alive?(pid) end, + false, + 1000, + 1 + ) + end end end - defp eventually_stop(pid) do - Eventually.eventually(fn -> Process.alive?(pid) end, false, 1000, 1) + defp lookup_doc_supervisor(registry, key) do + matches = + Registry.select(registry, [ + {{{:"$1", :"$2"}, :"$3", :"$4"}, + [ + {:andalso, {:==, :"$1", :doc_supervisor}, + {:==, {:binary_part, :"$2", 0, byte_size(key)}, key}} + ], [:"$3"]} + ]) + + List.first(matches) end end diff --git a/test/support/fake_rambo.ex b/test/support/fake_rambo.ex index bcd4e7a4530..c00e0724326 100644 --- a/test/support/fake_rambo.ex +++ b/test/support/fake_rambo.ex @@ -1,25 +1,49 @@ defmodule FakeRambo do @moduledoc """ - Mock implementation of Rambo + Mock implementation of Rambo. - Uses the current process to retrieve overridden responses. + The Cachex cache that backs `stub_run/1` is started under each test's + ExUnit supervisor via `FakeRambo.Setup.start_cache!/0`, so it lives for + exactly one test. This avoids the previous failure mode where the cache's + ETS table was owned by whichever test first called `Cachex.start_link/1` + and disappeared once that test exited, breaking unrelated subsequent + tests. """ - defmodule Helpers do - def stub_run(res) do - Cachex.start_link(:fake_rambo_cache) - if :ets.whereis(:fake_rambo_cache) == :undefined do - Process.sleep(100) - end + @cache_name :fake_rambo_cache + + defmodule Setup do + @moduledoc false - Cachex.put(:fake_rambo_cache, :res, res) + @doc """ + Starts a Cachex instance for `FakeRambo` under the current test's + ExUnit supervisor. Must be called from a test setup or test body. + """ + def start_cache! do + ExUnit.Callbacks.start_supervised!( + {Cachex, name: FakeRambo.cache_name()}, + restart: :temporary + ) + + :ok end end + defmodule Helpers do + @moduledoc false + + def stub_run(res) do + Cachex.put(FakeRambo.cache_name(), :res, res) + end + end + + @doc false + def cache_name, do: @cache_name + def run(command, args, opts) do send(self(), {command, args, opts}) - case Cachex.get(:fake_rambo_cache, :res) do + case Cachex.get(@cache_name, :res) do {:ok, nil} -> {:ok, %{out: "", status: 0}} {:ok, res} -> res {:error, _} -> {:ok, %{out: "", status: 0}}