Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ and this project adheres to

### Fixed

- Fix version-stuck bug where the collaborative editor shows stale state after a
sandbox merge or CLI deploy.
[#4535](https://github.com/OpenFn/lightning/issues/4535)
- Collection storage on Project Settings, Collections no longer shows `0` for
collections holding less than one megabyte of data. The underlying counter was
always correct, the rendering now reflects values at any scale.
Expand Down
102 changes: 69 additions & 33 deletions lib/lightning/collaboration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,33 @@ defmodule Lightning.Collaborate do

Collaborate.start(user: user, workflow: workflow)
"""
alias Lightning.Accounts.User
alias Lightning.Collaboration.DocumentSupervisor
alias Lightning.Collaboration.Registry
alias Lightning.Collaboration.Session
alias Lightning.Collaboration.Supervisor, as: SessionSupervisor
alias Lightning.Collaboration.Topology

require Logger

@pg_scope :workflow_collaboration

@spec start(opts :: Keyword.t()) :: GenServer.on_start()
def start(opts) do
base = Keyword.get(opts, :base, Topology.base())

case do_start(opts, base) do
{:error, {:error, :shared_doc_not_found}} ->
# A SharedDoc was registered in :pg but died before the Session could
# observe it (0ms auto_exit race). Yield one ms — enough for the timer
# to fire and clear :pg — then try once more from scratch.
Process.sleep(1)
do_start(opts, base)

result ->
result
end
end

defp do_start(opts, base) do
session_id = Ecto.UUID.generate()
parent_pid = Keyword.get(opts, :parent_pid, self())

Expand All @@ -52,45 +68,65 @@ defmodule Lightning.Collaborate do
"Starting collaboration for document: #{document_name} (workflow: #{workflow.id})"
)

# Ensure document supervisor exists for this document
case lookup_shared_doc(document_name) do
nil ->
Logger.info("Starting document for #{document_name}")
{:ok, _doc_supervisor_pid} = start_document(workflow, document_name)

_shared_doc_pid ->
Logger.info("Found existing document for #{document_name}")
:ok
# Ensure document supervisor exists for this document.
# Returns {:error, reason} on failure rather than raising, so callers
# such as WorkflowReconciler can handle failures gracefully.
doc_result =
case lookup_shared_doc(document_name, base) do
nil ->
Logger.info("Starting document for #{document_name}")

case start_document(workflow, document_name, base) do
{:ok, _} -> :ok
error -> error
end

_shared_doc_pid ->
Logger.info("Found existing document for #{document_name}")
:ok
end

case doc_result do
:ok ->
# Start session for this user
user_id = if is_struct(user, User), do: user.id, else: nil

SessionSupervisor.start_child(base, {
Session,
workflow: workflow,
user: user,
parent_pid: parent_pid,
document_name: document_name,
base: base,
name:
Topology.via(
base,
{:session, "#{document_name}:#{session_id}", user_id}
)
})

error ->
error
end

# Start session for this user
SessionSupervisor.start_child({
Session,
workflow: workflow,
user: user,
parent_pid: parent_pid,
document_name: document_name,
name: Registry.via({:session, "#{document_name}:#{session_id}", user.id})
})
end

def start_document(
%Lightning.Workflows.Workflow{} = workflow,
document_name
document_name,
base \\ Topology.base()
) do
{:ok, doc_supervisor_pid} =
SessionSupervisor.start_child(
{DocumentSupervisor,
workflow: workflow,
document_name: document_name,
name: Registry.via({:doc_supervisor, document_name})}
)

{:ok, doc_supervisor_pid}
SessionSupervisor.start_child(
base,
{DocumentSupervisor,
workflow: workflow,
document_name: document_name,
base: base,
name: Topology.via(base, {:doc_supervisor, document_name})}
)
end

defp lookup_shared_doc(document_name) do
case :pg.get_members(@pg_scope, document_name) do
defp lookup_shared_doc(document_name, base) do
case :pg.get_members(Topology.pg_scope(base), document_name) do
[] -> nil
[shared_doc_pid | _] -> shared_doc_pid
end
Expand Down
23 changes: 11 additions & 12 deletions lib/lightning/collaboration/document_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ defmodule Lightning.Collaboration.DocumentSupervisor do
"""
use GenServer

import Lightning.Collaboration.Registry, only: [via: 1]

alias Lightning.Collaboration.Persistence
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Topology

require Logger

@pg_scope :workflow_collaboration

def start_link(args, opts \\ []) do
GenServer.start_link(__MODULE__, args, opts)
end
Expand All @@ -31,12 +28,18 @@ defmodule Lightning.Collaboration.DocumentSupervisor do
def init(opts) do
workflow = Keyword.fetch!(opts, :workflow)
document_name = Keyword.fetch!(opts, :document_name)
base = Keyword.fetch!(opts, :base)

# Resolve topology references once at init time so subsequent callbacks
# don't need to re-read the Mox stub or Application env.
pg_scope = Topology.pg_scope(base)

{:ok, persistence_writer_pid} =
PersistenceWriter.start_link(
document_name: document_name,
workflow_id: workflow.id,
name: via({:persistence_writer, document_name})
base: base,
name: Topology.via(base, {:persistence_writer, document_name})
)

persistence_writer_ref = Process.monitor(persistence_writer_pid)
Expand All @@ -53,17 +56,18 @@ defmodule Lightning.Collaboration.DocumentSupervisor do
persistence_writer: persistence_writer_pid
}}
],
name: via({:shared_doc, document_name})
name: Topology.via(base, {:shared_doc, document_name})
)

# Register with :pg using document_name so versioned rooms are isolated
:ok = register_shared_doc_with_pg(document_name, shared_doc_pid)
:ok = :pg.join(pg_scope, document_name, shared_doc_pid)

shared_doc_ref = Process.monitor(shared_doc_pid)

{:ok,
%{
workflow: workflow,
pg_scope: pg_scope,
persistence_writer_pid: persistence_writer_pid,
persistence_writer_ref: persistence_writer_ref,
shared_doc_pid: shared_doc_pid,
Expand Down Expand Up @@ -137,9 +141,4 @@ defmodule Lightning.Collaboration.DocumentSupervisor do

{:stop, :normal, state |> Map.put(key, nil)}
end

# Supervisor.start_link(children, strategy: :one_for_all)
defp register_shared_doc_with_pg(document_name, shared_doc_pid) do
:pg.join(@pg_scope, document_name, shared_doc_pid)
end
end
108 changes: 7 additions & 101 deletions lib/lightning/collaboration/persistence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ defmodule Lightning.Collaboration.Persistence do
@behaviour Yex.Sync.SharedDoc.PersistenceBehaviour

alias Lightning.Collaboration.DocumentState
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Session
alias Lightning.Collaboration.WorkflowSerializer

require Logger

Expand All @@ -30,7 +28,6 @@ defmodule Lightning.Collaboration.Persistence do
case DocumentState.get_checkpoint_and_updates(doc_name) do
{:ok, checkpoint, updates} ->
apply_persisted_state(doc, doc_name, checkpoint, updates)
reconcile_or_reset(doc, doc_name, workflow)

{:error, :not_found} ->
Logger.info(
Expand All @@ -46,16 +43,17 @@ defmodule Lightning.Collaboration.Persistence do

@impl true
def update_v1(state, update, doc_name, _doc) do
case PersistenceWriter.add_update(doc_name, update) do
:ok ->
state

{:error, reason} ->
case state[:persistence_writer] do
nil ->
Logger.error(
"Failed to add update to PersistenceWriter: #{inspect(reason)}"
"PersistenceWriter pid not in persistence state. document=#{doc_name}"
)

state

pid ->
GenServer.cast(pid, {:add_update, update})
state
end
end

Expand Down Expand Up @@ -89,96 +87,4 @@ defmodule Lightning.Collaboration.Persistence do
DocumentState.apply_to_doc(doc, checkpoint, updates)
Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}")
end

defp reconcile_or_reset(doc, doc_name, workflow) do
workflow_map = Yex.Doc.get_map(doc, "workflow")
persisted_lock_version = extract_lock_version(workflow_map)
current_lock_version = workflow.lock_version

if stale?(persisted_lock_version, current_lock_version) do
Logger.warning("""
Persisted Y.Doc is stale (persisted: #{inspect(persisted_lock_version)}, \
current: #{current_lock_version})
Discarding persisted state and reloading from database.
document=#{doc_name}
""")

clear_and_reset_workflow(doc, workflow)
else
Logger.debug(
"Persisted Y.Doc is current (lock_version: #{current_lock_version}). document=#{doc_name}"
)

reconcile_workflow_metadata(doc, workflow)
end
end

defp extract_lock_version(workflow_map) do
case Yex.Map.fetch(workflow_map, "lock_version") do
{:ok, version} when is_float(version) -> trunc(version)
{:ok, version} when is_integer(version) -> version
{:ok, nil} -> nil
:error -> nil
end
end

defp stale?(nil, current_version), do: not is_nil(current_version)

defp stale?(persisted_version, current_version),
do: persisted_version != current_version

defp clear_and_reset_workflow(doc, workflow) do
# Same pattern as Session.clear_and_reset_doc
# Get all Yex collections BEFORE transaction to avoid VM deadlock
jobs_array = Yex.Doc.get_array(doc, "jobs")
edges_array = Yex.Doc.get_array(doc, "edges")
triggers_array = Yex.Doc.get_array(doc, "triggers")

# Transaction 1: Clear all arrays
Yex.Doc.transaction(doc, "clear_stale_workflow", fn ->
clear_array(jobs_array)
clear_array(edges_array)
clear_array(triggers_array)
end)

# Transaction 2: Re-serialize workflow from database
Session.initialize_workflow_document(doc, workflow)

:ok
end

defp clear_array(array) do
length = Yex.Array.length(array)

if length > 0 do
Yex.Array.delete_range(array, 0, length)
end
end

defp reconcile_workflow_metadata(doc, workflow) do
# Update workflow metadata fields to match current database state
# This is critical when loading persisted Y.Doc state that may be stale
workflow_map = Yex.Doc.get_map(doc, "workflow")

Yex.Doc.transaction(doc, "reconcile_metadata", fn ->
# Update lock_version to current database value
Yex.Map.set(workflow_map, "lock_version", workflow.lock_version)

# Update name in case it changed
Yex.Map.set(workflow_map, "name", workflow.name)

# Update deleted_at if present
Yex.Map.set(
workflow_map,
"deleted_at",
WorkflowSerializer.datetime_to_string(workflow.deleted_at)
)
end)

Logger.debug(
"Reconciled workflow metadata: lock_version=#{workflow.lock_version}, name=#{workflow.name}"
)

:ok
end
end
Loading