Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,18 @@
#
# These 2 envs are used to enable local adaptors mode. OPENFN_ADAPTORS_REPO points
# to the repo directory which must have a `packages` subdir. LOCAL_ADAPTORS env is
# the flag used to enable/disable this mode
# the flag used to enable/disable this mode.
#
# OPENFN_ADAPTORS_REPO accepts a colon-separated list of paths so that a private
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many docs here - I would remove all this. RUNNINGLOCAL.md explains the variables well enough, no need to bloat this file

# adaptor repo can be loaded alongside the canonical OpenFn adaptors monorepo.
# Order is precedence: when two repos ship a package with the same dirname, the
# entry from the earlier path wins and a warning is logged for the shadowed one.
# Note: the ws-worker still expects a single path for `@local` adaptor execution,
# so a colon-separated value only affects what the registry exposes (picker UI,
# metadata). Single-path values keep behaving as before.
# LOCAL_ADAPTORS=true
# OPENFN_ADAPTORS_REPO=/path/to/repo/
# OPENFN_ADAPTORS_REPO=/path/to/private:/path/to/canonical
#
# Control whether metrics reported by the Workflow editor or Job editor are
# written to the Lightning logs.
Expand Down
9 changes: 8 additions & 1 deletion RUNNINGLOCAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,20 @@ To start, set up the following environment variables:
- `LOCAL_ADAPTORS`: Used to enable or disable the local adaptors mode. Set it to
`true` to enable.
- `OPENFN_ADAPTORS_REPO`: This should point to the adaptors monorepo. This is
the same variable used when you pass `-m` to the CLI.
the same variable used when you pass `-m` to the CLI. It also accepts a
colon-separated list of paths to merge multiple repos into the registry; the
first path wins on dirname collisions, with a warning logged for shadowed
entries. The bundled `ws-worker` still resolves `@local` adaptors against a
single path, so colon-separated values only widen the picker/metadata view.

Example configuration:

```sh
export LOCAL_ADAPTORS=true
export OPENFN_ADAPTORS_REPO=/path/to/repo/

# Or, merge a private adaptor repo with the canonical one (first wins):
export OPENFN_ADAPTORS_REPO=/path/to/private:/path/to/canonical
```

You can also run the server directly in local mode with:
Expand Down
104 changes: 84 additions & 20 deletions lib/lightning/adaptor_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ defmodule Lightning.AdaptorRegistry do
def handle_continue(opts, _state) do
adaptors =
case Enum.into(opts, %{}) do
%{local_adaptors_repo: repo_path} when is_binary(repo_path) ->
read_adaptors_from_local_repo(repo_path)
%{local_adaptors_repos: repo_paths}
when is_list(repo_paths) and repo_paths != [] ->
read_adaptors_from_local_repos(repo_paths)

%{use_cache: use_cache}
when use_cache === true or is_binary(use_cache) ->
Expand Down Expand Up @@ -150,10 +151,21 @@ defmodule Lightning.AdaptorRegistry do
and uses the cached file for every subsequent start.
It can either be a boolean, or a string - the latter being a file path
to set where the cache file is located.
- `:local_adaptors_repos` - an ordered list of paths to local adaptor
monorepos (each containing a `packages/` subdirectory). When set, the
registry skips NPM and lists adaptors from these directories instead.
Earlier paths win on dirname collisions; shadowed entries are summarised
in a single warning log.
- `:name` (defaults to AdaptorRegistry) - the name of the process, useful
for testing and/or running multiple versions of the registry
"""
@spec start_link(opts :: [use_cache: boolean() | binary(), name: term()]) ::
@spec start_link(
opts :: [
use_cache: boolean() | binary(),
local_adaptors_repos: [binary()],
name: term()
]
) ::
{:error, any} | {:ok, pid}
def start_link(opts \\ [use_cache: true]) do
Logger.info("Starting AdaptorRegistry")
Expand Down Expand Up @@ -270,20 +282,71 @@ defmodule Lightning.AdaptorRegistry do
}
end

defp read_adaptors_from_local_repo(repo_path) do
Logger.debug("Using local adaptors repo at #{repo_path}")

repo_path
|> Path.join("packages")
|> File.ls!()
|> Enum.map(fn package ->
%{
name: "@openfn/language-" <> package,
repo: "file://" <> Path.join([repo_path, "packages", package]),
latest: "local",
versions: []
}
end)
defp read_adaptors_from_local_repos(repo_paths) when is_list(repo_paths) do
Logger.debug("Using local adaptors repos at #{inspect(repo_paths)}")

repo_paths
|> Enum.flat_map(&adaptors_in_repo/1)
|> dedupe_first_wins()
end

defp adaptors_in_repo(repo_path) do
packages_path = Path.join(repo_path, "packages")

case File.ls(packages_path) do
{:ok, entries} ->
Enum.map(entries, fn package ->
%{
name: "@openfn/language-" <> package,
repo: "file://" <> Path.join([repo_path, "packages", package]),
latest: "local",
versions: []
}
end)

{:error, reason} ->
Logger.error(
"Skipping local adaptors repo #{inspect(repo_path)}: " <>
"cannot list #{inspect(packages_path)} (#{:file.format_error(reason)})"
)

[]
end
end

# First-occurrence wins: when two roots ship a package with the same
# `@openfn/language-<dirname>` name, the entry from the earlier root is kept.
# Listing your private repo before the canonical one therefore lets you
# override individual adaptors locally without forking the whole canonical
# tree. Shadowed entries are summarised in a single warning so the override
# case (the intended use of ordering) does not flood logs with one line per
# package.
defp dedupe_first_wins(adaptors) do
{kept_reversed, _seen, shadowed} =
Enum.reduce(adaptors, {[], MapSet.new(), []}, fn adaptor, {kept, seen, shadowed} ->
if MapSet.member?(seen, adaptor.name) do
{kept, seen, [adaptor | shadowed]}
else
{[adaptor | kept], MapSet.put(seen, adaptor.name), shadowed}
end
end)

log_shadowed(shadowed)
Enum.reverse(kept_reversed)
end

defp log_shadowed([]), do: :ok

defp log_shadowed(shadowed) do
names =
shadowed
|> Enum.reverse()
|> Enum.map_join(", ", & &1.name)

Logger.warning(
"AdaptorRegistry: #{length(shadowed)} adaptor(s) shadowed by earlier " <>
"local-adaptors repo entries: #{names}"
)
end

@doc """
Expand Down Expand Up @@ -362,8 +425,9 @@ defmodule Lightning.AdaptorRegistry do
end

def local_adaptors_enabled? do
config = Lightning.Config.adaptor_registry()

if config[:local_adaptors_repo], do: true, else: false
case Lightning.Config.adaptor_registry()[:local_adaptors_repos] do
[_ | _] -> true
_ -> false
end
end
end
36 changes: 22 additions & 14 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,29 @@ defmodule Lightning.Config.Bootstrap do
config :lightning, :adaptor_service,
adaptors_path: env!("ADAPTORS_PATH", :string, "./priv/openfn")

local_adaptors_repo =
env!(
"OPENFN_ADAPTORS_REPO",
:string,
Utils.get_env([
:lightning,
Lightning.AdaptorRegistry,
:local_adaptors_repo
])
)
# OPENFN_ADAPTORS_REPO accepts a colon-separated list of paths so that a
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These docs are in the wrong place. Better to have them in runninglocal.md

# private adaptor repo can be loaded alongside the canonical OpenFn
# adaptors monorepo. A single path is still valid; it just becomes a
# one-element list. Order is precedence: earlier entries shadow later ones
# when two repos ship a package with the same name.
local_adaptors_repos =
env!("OPENFN_ADAPTORS_REPO", :string, nil)
|> case do
nil ->
[]

value when is_binary(value) ->
value
|> String.split(":", trim: true)
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.map(&Path.expand/1)
end

use_local_adaptors_repo? =
use_local_adaptors_repos? =
env!("LOCAL_ADAPTORS", &Utils.ensure_boolean/1, false)
|> tap(fn v ->
if v && !is_binary(local_adaptors_repo) do
if v && local_adaptors_repos == [] do
raise """
LOCAL_ADAPTORS is set to true, but OPENFN_ADAPTORS_REPO is not set.
"""
Expand All @@ -231,8 +239,8 @@ defmodule Lightning.Config.Bootstrap do
:string,
Utils.get_env([:lightning, Lightning.AdaptorRegistry, :use_cache])
),
local_adaptors_repo:
use_local_adaptors_repo? && Path.expand(local_adaptors_repo)
local_adaptors_repos:
if(use_local_adaptors_repos?, do: local_adaptors_repos, else: [])

config :lightning,
schemas_path:
Expand Down
106 changes: 97 additions & 9 deletions test/lightning/adaptor_registry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,15 @@ defmodule Lightning.AdaptorRegistryTest do
end

@tag :tmp_dir
test "lists directory names of the when local_adaptors_repo is set", %{
tmp_dir: tmp_dir,
test: test
} do
test "lists directory names from a single-element local_adaptors_repos list",
%{tmp_dir: tmp_dir, test: test} do
expected_adaptors = ["foo", "bar", "baz"]

Enum.each(expected_adaptors, fn adaptor ->
[tmp_dir, "packages", adaptor] |> Path.join() |> File.mkdir_p!()
end)

start_supervised!(
{AdaptorRegistry, [name: test, local_adaptors_repo: tmp_dir]}
)
start_supervised!({AdaptorRegistry, [name: test, local_adaptors_repos: [tmp_dir]]})

results = AdaptorRegistry.all(test)

Expand All @@ -158,6 +154,98 @@ defmodule Lightning.AdaptorRegistryTest do
assert expected_result in results
end
end

@tag :tmp_dir
test "merges adaptors from multiple local_adaptors_repos", %{
tmp_dir: tmp_dir,
test: test
} do
repo_a = Path.join(tmp_dir, "a")
repo_b = Path.join(tmp_dir, "b")
[repo_a, "packages", "alpha"] |> Path.join() |> File.mkdir_p!()
[repo_b, "packages", "beta"] |> Path.join() |> File.mkdir_p!()

start_supervised!({AdaptorRegistry, [name: test, local_adaptors_repos: [repo_a, repo_b]]})

names = AdaptorRegistry.all(test) |> Enum.map(& &1.name) |> Enum.sort()

assert names == ["@openfn/language-alpha", "@openfn/language-beta"]
end

@tag :tmp_dir
test "first repo wins on collision and emits a warning", %{
tmp_dir: tmp_dir,
test: test
} do
repo_a = Path.join(tmp_dir, "a")
repo_b = Path.join(tmp_dir, "b")
[repo_a, "packages", "http"] |> Path.join() |> File.mkdir_p!()
[repo_b, "packages", "http"] |> Path.join() |> File.mkdir_p!()

log =
ExUnit.CaptureLog.capture_log(fn ->
start_supervised!(
{AdaptorRegistry, [name: test, local_adaptors_repos: [repo_a, repo_b]]}
)

# force the GenServer to finish handle_continue
AdaptorRegistry.all(test)
end)

results = AdaptorRegistry.all(test)
assert length(results) == 1
assert hd(results).repo == "file://" <> Path.join([repo_a, "packages", "http"])
assert log =~ "@openfn/language-http"
assert log =~ "shadowed"
end

@tag :tmp_dir
test "soft-fails when a repo path is missing or unreadable", %{
tmp_dir: tmp_dir,
test: test
} do
good_repo = Path.join(tmp_dir, "good")
missing_repo = Path.join(tmp_dir, "does-not-exist")
[good_repo, "packages", "alpha"] |> Path.join() |> File.mkdir_p!()

log =
ExUnit.CaptureLog.capture_log(fn ->
start_supervised!(
{AdaptorRegistry, [name: test, local_adaptors_repos: [missing_repo, good_repo]]}
)

AdaptorRegistry.all(test)
end)

names = AdaptorRegistry.all(test) |> Enum.map(& &1.name)
assert names == ["@openfn/language-alpha"]
assert log =~ "Skipping local adaptors repo"
assert log =~ missing_repo
end
end

describe "local_adaptors_enabled?/0" do
test "returns true when a non-empty plural list is configured" do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn ->
[local_adaptors_repos: ["/some/path"]]
end)

assert AdaptorRegistry.local_adaptors_enabled?()
end

test "returns false when the list is empty" do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn ->
[local_adaptors_repos: []]
end)

refute AdaptorRegistry.local_adaptors_enabled?()
end

test "returns false when the key is absent" do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn -> [] end)

refute AdaptorRegistry.local_adaptors_enabled?()
end
end

describe "resolve_package_name/1" do
Expand All @@ -178,10 +266,10 @@ defmodule Lightning.AdaptorRegistryTest do
end

@tag :tmp_dir
test "returns local as the version when local_adaptors_repo config is set",
test "returns local as the version when local_adaptors_repos config is set",
%{tmp_dir: tmp_dir} do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn ->
[local_adaptors_repo: tmp_dir]
[local_adaptors_repos: [tmp_dir]]
end)

assert AdaptorRegistry.resolve_package_name("@openfn/language-foo@1.2.3") ==
Expand Down
Loading