diff --git a/.env.example b/.env.example index aa5ded91b3c..2faa6f46f2b 100644 --- a/.env.example +++ b/.env.example @@ -267,7 +267,7 @@ # notification. To prevent flooding the recipients, it will wait for a period # before it sends the next email (assuming the failure condition persists). # Changing this setting will affect the frequency of sending. -# KAFKA_NOTIFICATTION_EMBARGO_SECONDS=3600 +# KAFKA_NOTIFICATION_EMBARGO_SECONDS=3600 # # If the Kafka pipelines failed to persist a message, the message can be # persisted as JSON to the local file system. To enable this, set @@ -282,7 +282,7 @@ # Lightning starts and it must be writable by the user that Lightning runs as. # KAFKA_ALTERNATE_STORAGE_FILE_PATH=/path/to/alternate/storage # -# This file to which the registry should be read from. In case the file doesnt +# This file to which the registry should be read from. In case the file doesn't # exist, Lightning will attempt to fetch the file and write it to the same location. # For this reason, you have to make sure that the directory exists and it is writable # ADAPTORS_REGISTRY_JSON_PATH=/path/to/adaptor_registry_cache.json diff --git a/CHANGELOG.md b/CHANGELOG.md index f5595a503be..3079df05b35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,12 @@ and this project adheres to ### Changed +- Allow instance admins to install credential schemas and update the adaptor + registry on the fly [#3114](https://github.com/OpenFn/lightning/issues/3114), + [#2209](https://github.com/OpenFn/lightning/issues/2209), + [#325](https://github.com/OpenFn/lightning/issues/325), + [#1996](https://github.com/OpenFn/lightning/issues/1996) + ### Fixed - Auto-increment job name when adaptor display name is already used in workflow diff --git a/config/dev.exs b/config/dev.exs index ad4124a0307..6dda246972a 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -37,6 +37,9 @@ config :lightning, LightningWeb.Endpoint, storybook_tailwind: {Tailwind, :install_and_run, [:storybook, ~w(--watch)]} ] +# schemas_path and adaptor_icons_path are only used by build-time mix tasks +# (install_schemas, install_adaptor_icons) for baking data into Docker images. +# At runtime, the DB is the primary source via AdaptorData/ETS cache. config :lightning, schemas_path: "priv/schemas", adaptor_icons_path: "priv/static/images/adaptors", diff --git a/config/prod.exs b/config/prod.exs index 409da93767f..91ac4d0b9a2 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -28,6 +28,9 @@ config :phoenix, :filter_parameters, [ "token" ] +# schemas_path and adaptor_icons_path are only used by build-time mix tasks +# (install_schemas, install_adaptor_icons) for baking data into Docker images. +# At runtime, the DB is the primary source via AdaptorData/ETS cache. config :lightning, schemas_path: "priv/schemas", adaptor_icons_path: "priv/static/images/adaptors" diff --git a/config/test.exs b/config/test.exs index 9641f9e9a60..ff65603c09e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -105,6 +105,8 @@ config :lightning, Lightning.FailureAlerter, time_scale: 60_000, rate_limit: 3 +# schemas_path and adaptor_icons_path point to test fixtures for build-time +# mix task tests. At runtime, tests use the DB via AdaptorData/ETS cache. config :lightning, schemas_path: "test/fixtures/schemas", adaptor_icons_path: "test/fixtures/adaptors/icons", diff --git a/lib/lightning/adaptor_data.ex b/lib/lightning/adaptor_data.ex new file mode 100644 index 00000000000..12bc27fe1ce --- /dev/null +++ b/lib/lightning/adaptor_data.ex @@ -0,0 +1,123 @@ +defmodule Lightning.AdaptorData do + @moduledoc """ + Context for managing adaptor cache entries in the database. + + Provides CRUD operations for storing adaptor registry data, credential + schemas, adaptor icons, and other cacheable adaptor metadata. Each entry + is keyed by a `kind` (category) and a unique `key` within that kind. + """ + import Ecto.Query + + alias Lightning.AdaptorData.CacheEntry + alias Lightning.Repo + + @doc """ + Upserts a single cache entry. + + If an entry with the same `kind` and `key` already exists, its `data`, + `content_type`, and `updated_at` fields are replaced. + + Returns `{:ok, %CacheEntry{}}` or `{:error, %Ecto.Changeset{}}`. + """ + @spec put(String.t(), String.t(), binary(), String.t()) :: + {:ok, CacheEntry.t()} | {:error, Ecto.Changeset.t()} + def put(kind, key, data, content_type \\ "application/json") do + %CacheEntry{} + |> CacheEntry.changeset(%{ + kind: kind, + key: key, + data: data, + content_type: content_type + }) + |> Repo.insert( + conflict_target: [:kind, :key], + on_conflict: {:replace, [:data, :content_type, :updated_at]}, + returning: true + ) + end + + @doc """ + Bulk upserts a list of entries for the given `kind`. + + Each entry in `entries` must be a map with `:key`, `:data`, and optionally + `:content_type` keys. + + Returns `{count, nil | [%CacheEntry{}]}` where `count` is the number of + rows affected. + """ + @spec put_many(String.t(), [map()]) :: {non_neg_integer(), nil} + def put_many(kind, entries) when is_list(entries) do + now = DateTime.utc_now() |> DateTime.truncate(:microsecond) + + rows = + Enum.map(entries, fn entry -> + %{ + id: Ecto.UUID.generate(), + kind: kind, + key: Map.fetch!(entry, :key), + data: Map.fetch!(entry, :data), + content_type: Map.get(entry, :content_type, "application/json"), + inserted_at: now, + updated_at: now + } + end) + + Repo.insert_all(CacheEntry, rows, + conflict_target: [:kind, :key], + on_conflict: {:replace, [:data, :content_type, :updated_at]} + ) + end + + @doc """ + Gets a single cache entry by `kind` and `key`. + + Returns `{:ok, %CacheEntry{}}` or `{:error, :not_found}`. + """ + @spec get(String.t(), String.t()) :: + {:ok, CacheEntry.t()} | {:error, :not_found} + def get(kind, key) do + case Repo.get_by(CacheEntry, kind: kind, key: key) do + nil -> {:error, :not_found} + entry -> {:ok, entry} + end + end + + @doc """ + Gets all cache entries for the given `kind`. + + Returns a list of `%CacheEntry{}` structs ordered by key. + """ + @spec get_all(String.t()) :: [CacheEntry.t()] + def get_all(kind) do + CacheEntry + |> where([e], e.kind == ^kind) + |> order_by([e], asc: e.key) + |> Repo.all() + end + + @doc """ + Deletes all cache entries for the given `kind`. + + Returns `{count, nil}` where `count` is the number of deleted rows. + """ + @spec delete_kind(String.t()) :: {non_neg_integer(), nil} + def delete_kind(kind) do + CacheEntry + |> where([e], e.kind == ^kind) + |> Repo.delete_all() + end + + @doc """ + Deletes a specific cache entry by `kind` and `key`. + + Returns `{:ok, %CacheEntry{}}` or `{:error, :not_found}`. + """ + @spec delete(String.t(), String.t()) :: + {:ok, CacheEntry.t()} | {:error, :not_found} + def delete(kind, key) do + case Repo.get_by(CacheEntry, kind: kind, key: key) do + nil -> {:error, :not_found} + entry -> {:ok, Repo.delete!(entry)} + end + end +end diff --git a/lib/lightning/adaptor_data/cache.ex b/lib/lightning/adaptor_data/cache.ex new file mode 100644 index 00000000000..7f87cb6f86f --- /dev/null +++ b/lib/lightning/adaptor_data/cache.ex @@ -0,0 +1,98 @@ +defmodule Lightning.AdaptorData.Cache do + @moduledoc """ + ETS-backed read-through cache for adaptor data. + + Read path: ETS -> DB -> nil + Write path: DB -> broadcast invalidate -> all nodes clear ETS + Next read on any node: ETS miss -> DB hit -> ETS populated + """ + + @table __MODULE__ + + @doc "Create the ETS table. Called from Application.start/2." + def init do + if :ets.whereis(@table) == :undefined do + :ets.new(@table, [ + :set, + :public, + :named_table, + read_concurrency: true + ]) + end + + :ok + end + + @doc "Get a cached value. Falls back to DB on miss, populates ETS." + def get(kind, key) do + cache_key = {kind, key} + + case :ets.lookup(@table, cache_key) do + [{^cache_key, value}] -> + value + + [] -> + case Lightning.AdaptorData.get(kind, key) do + {:error, :not_found} -> + nil + + {:ok, entry} -> + value = %{data: entry.data, content_type: entry.content_type} + :ets.insert(@table, {cache_key, value}) + value + end + end + end + + @doc "Get all entries of a kind. Falls back to DB on miss." + def get_all(kind) do + cache_key = {kind, :__all__} + + case :ets.lookup(@table, cache_key) do + [{^cache_key, entries}] -> + entries + + [] -> + entries = Lightning.AdaptorData.get_all(kind) + + if entries != [] do + values = + Enum.map(entries, fn e -> + %{key: e.key, data: e.data, content_type: e.content_type} + end) + + :ets.insert(@table, {cache_key, values}) + values + else + [] + end + end + end + + @doc "Invalidate all cached entries for a kind." + def invalidate(kind) do + # Delete the :__all__ key + :ets.delete(@table, {kind, :__all__}) + + # Delete individual keys for this kind + :ets.select_delete(@table, [ + {{{kind, :_}, :_}, [], [true]} + ]) + + :ok + end + + @doc "Invalidate all cached entries." + def invalidate_all do + :ets.delete_all_objects(@table) + :ok + end + + @doc "Broadcast cache invalidation to all nodes." + def broadcast_invalidation(kinds) when is_list(kinds) do + Lightning.API.broadcast( + "adaptor:data", + {:invalidate_cache, kinds, node()} + ) + end +end diff --git a/lib/lightning/adaptor_data/cache_entry.ex b/lib/lightning/adaptor_data/cache_entry.ex new file mode 100644 index 00000000000..038b54501dd --- /dev/null +++ b/lib/lightning/adaptor_data/cache_entry.ex @@ -0,0 +1,37 @@ +defmodule Lightning.AdaptorData.CacheEntry do + @moduledoc """ + Schema for adaptor cache entries stored in the database. + + Each entry is identified by a `kind` (e.g., "registry", "schema", "icon") + and a `key` (e.g., adaptor name or path). The `data` field holds the raw + binary content and `content_type` describes its format. + """ + use Lightning.Schema + + @type t :: %__MODULE__{ + id: Ecto.UUID.t(), + kind: String.t(), + key: String.t(), + data: binary(), + content_type: String.t(), + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + schema "adaptor_cache_entries" do + field :kind, :string + field :key, :string + field :data, :binary + field :content_type, :string, default: "application/json" + + timestamps(type: :utc_datetime_usec) + end + + @doc false + def changeset(entry, attrs) do + entry + |> cast(attrs, [:kind, :key, :data, :content_type]) + |> validate_required([:kind, :key, :data]) + |> unique_constraint([:kind, :key]) + end +end diff --git a/lib/lightning/adaptor_data/listener.ex b/lib/lightning/adaptor_data/listener.ex new file mode 100644 index 00000000000..680c8d7f278 --- /dev/null +++ b/lib/lightning/adaptor_data/listener.ex @@ -0,0 +1,34 @@ +defmodule Lightning.AdaptorData.Listener do + @moduledoc """ + GenServer that subscribes to PubSub for cache invalidation messages. + + When a node writes new data to the DB, it broadcasts + `{:invalidate_cache, kinds, node()}`. All nodes (including the sender) + clear those kinds from their ETS cache. The next read on any node will + go to DB and repopulate ETS. + """ + + use GenServer + require Logger + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl GenServer + def init(_opts) do + Lightning.API.subscribe("adaptor:data") + {:ok, %{}} + end + + @impl GenServer + def handle_info({:invalidate_cache, kinds, _origin_node}, state) do + Logger.info("Invalidating adaptor cache for: #{inspect(kinds)}") + + Enum.each(kinds, &Lightning.AdaptorData.Cache.invalidate/1) + + {:noreply, state} + end + + def handle_info(_msg, state), do: {:noreply, state} +end diff --git a/lib/lightning/adaptor_icons.ex b/lib/lightning/adaptor_icons.ex new file mode 100644 index 00000000000..9cbbf690f0a --- /dev/null +++ b/lib/lightning/adaptor_icons.ex @@ -0,0 +1,130 @@ +defmodule Lightning.AdaptorIcons do + @moduledoc """ + Manages adaptor icon data in the DB-backed cache. + + Builds a manifest from the adaptor registry and optionally prefetches + icon PNGs from GitHub into the cache so they are served instantly by + `LightningWeb.AdaptorIconController`. + """ + + require Logger + + @github_base "https://raw.githubusercontent.com/OpenFn/adaptors/main/packages" + @shapes ["square", "rectangle"] + + @doc """ + Refreshes the icon manifest and spawns a background task to prefetch + all icons from GitHub. + + Returns `{:ok, manifest}` immediately after the manifest is stored. + """ + @spec refresh() :: {:ok, map()} | {:error, term()} + def refresh do + case refresh_manifest() do + {:ok, manifest} -> + Task.start(fn -> prefetch_icons(manifest) end) + {:ok, manifest} + + error -> + error + end + end + + @doc """ + Builds the icon manifest from the adaptor registry and stores it in + the DB cache. Broadcasts cache invalidation so all nodes pick up the + new manifest. + + Returns `{:ok, manifest}` where manifest is the JSON-decoded map. + """ + @spec refresh_manifest() :: {:ok, map()} | {:error, term()} + def refresh_manifest do + adaptors = Lightning.AdaptorRegistry.all() + + manifest = + adaptors + |> Enum.map(fn %{name: name} -> + short = short_name(name) + + sources = + Map.new(@shapes, fn shape -> + {shape, "/images/adaptors/#{short}-#{shape}.png"} + end) + + {short, sources} + end) + |> Enum.into(%{}) + + json_data = Jason.encode!(manifest) + + case Lightning.AdaptorData.put( + "icon_manifest", + "all", + json_data, + "application/json" + ) do + {:ok, _entry} -> + Lightning.AdaptorData.Cache.broadcast_invalidation([ + "icon_manifest" + ]) + + {:ok, manifest} + + {:error, changeset} -> + {:error, changeset} + end + end + + @doc """ + Prefetches icon PNGs for all adaptors and shapes. Skips icons that + are already cached in the DB. + """ + @spec prefetch_icons(map()) :: :ok + def prefetch_icons(manifest) do + client = Tesla.client([Tesla.Middleware.FollowRedirects]) + + manifest + |> Enum.each(fn {adaptor, _sources} -> + Enum.each(@shapes, fn shape -> + cache_key = "#{adaptor}-#{shape}" + + case Lightning.AdaptorData.get("icon", cache_key) do + {:ok, _entry} -> + :ok + + {:error, :not_found} -> + fetch_and_store_icon(client, adaptor, shape, cache_key) + end + end) + end) + + Lightning.AdaptorData.Cache.broadcast_invalidation(["icon"]) + :ok + end + + defp fetch_and_store_icon(client, adaptor, shape, cache_key) do + url = "#{@github_base}/#{adaptor}/assets/#{shape}.png" + + case Tesla.get(client, url) do + {:ok, %{status: 200, body: body}} -> + Lightning.AdaptorData.put("icon", cache_key, body, "image/png") + :ok + + {:ok, %{status: status}} -> + Logger.debug("Icon not found for #{adaptor}/#{shape} (HTTP #{status})") + + :skip + + {:error, reason} -> + Logger.warning( + "Failed to fetch icon #{adaptor}/#{shape}: " <> + "#{inspect(reason)}" + ) + + :error + end + end + + defp short_name("@openfn/language-" <> rest), do: rest + defp short_name(name), do: name +end diff --git a/lib/lightning/adaptor_refresh_worker.ex b/lib/lightning/adaptor_refresh_worker.ex new file mode 100644 index 00000000000..4ad0eadd0e5 --- /dev/null +++ b/lib/lightning/adaptor_refresh_worker.ex @@ -0,0 +1,89 @@ +defmodule Lightning.AdaptorRefreshWorker do + @moduledoc """ + Oban worker that periodically refreshes the adaptor registry and + credential schemas from their upstream sources, storing results in + the database via `Lightning.AdaptorData`. + + Scheduled via cron when `ADAPTOR_REFRESH_INTERVAL_HOURS` is configured. + Returns `:ok` even on partial failure since retries are not useful for + transient network issues — the next scheduled run will try again. + """ + + use Oban.Worker, + queue: :background, + max_attempts: 1, + unique: [period: 60] + + require Logger + + @impl Oban.Worker + def perform(%Oban.Job{}) do + if Lightning.AdaptorRegistry.local_adaptors_enabled?() do + Logger.info("Skipping scheduled adaptor refresh: local adaptors mode") + :ok + else + do_refresh() + end + end + + defp do_refresh do + Logger.info("Starting scheduled adaptor refresh") + + results = [ + {:registry, safe_call(&refresh_registry/0)}, + {:schemas, safe_call(&refresh_schemas/0)} + ] + + errors = + results + |> Enum.filter(fn {_, result} -> match?({:error, _}, result) end) + |> Enum.map(fn {name, {:error, reason}} -> {name, reason} end) + + refreshed_kinds = + results + |> Enum.filter(fn {_, result} -> match?({:ok, _}, result) end) + |> Enum.map(fn {kind, _} -> to_string(kind) end) + + if refreshed_kinds != [] do + Lightning.AdaptorData.Cache.broadcast_invalidation(refreshed_kinds) + end + + if errors == [] do + Logger.info("Scheduled adaptor refresh completed successfully") + else + Logger.warning( + "Scheduled adaptor refresh partially failed: #{inspect(errors)}" + ) + end + + :ok + end + + defp refresh_registry do + adaptors = Lightning.AdaptorRegistry.fetch() + + if adaptors == [] do + {:error, :empty_results} + else + data = Jason.encode!(adaptors) + Lightning.AdaptorData.put("registry", "all", data) + {:ok, length(adaptors)} + end + end + + defp refresh_schemas do + Lightning.CredentialSchemas.fetch_and_store() + end + + defp safe_call(fun) do + case fun.() do + :ok -> {:ok, :done} + {:ok, _} = ok -> ok + {:error, _} = error -> error + end + rescue + error -> + Logger.error("Adaptor refresh error: #{Exception.message(error)}") + {:error, Exception.message(error)} + end +end diff --git a/lib/lightning/adaptor_registry.ex b/lib/lightning/adaptor_registry.ex index ba1fa6ac202..b8cbb442487 100644 --- a/lib/lightning/adaptor_registry.ex +++ b/lib/lightning/adaptor_registry.ex @@ -17,9 +17,9 @@ defmodule Lightning.AdaptorRegistry do **Caching** - By default the results are cached to disk, and will be reused every start. - - In order to disable or configure caching pass see: `start_link/1`. + In non-local mode, adaptor data is read from the DB-backed ETS cache + (see `Lightning.AdaptorData.Cache`). The GenServer is still used for + local_adaptors_repo mode. The process uses `:continue` to return before the adaptors have been queried. This does mean that the first call to the process will be delayed until @@ -89,55 +89,15 @@ defmodule Lightning.AdaptorRegistry do @impl GenServer def handle_continue(opts, _state) do - adaptors = - case Enum.into(opts, %{}) do - %{local_adaptors_repo: repo_path} when is_binary(repo_path) -> - read_adaptors_from_local_repo(repo_path) - - %{use_cache: use_cache} - when use_cache === true or is_binary(use_cache) -> - cache_path = - if is_binary(use_cache) do - use_cache - else - Path.join([ - System.tmp_dir!(), - "lightning", - "adaptor_registry_cache.json" - ]) - end - - read_from_cache(cache_path) || write_to_cache(cache_path, fetch()) - - _other -> - fetch() - end - - {:noreply, adaptors} - end - - # false positive, it's a file from init - # sobelow_skip ["Traversal.FileModule"] - defp write_to_cache(path, adaptors) when is_binary(path) do - Logger.debug("Writing Adapter Registry to #{path}") - cache_file = File.open!(path, [:write]) - IO.binwrite(cache_file, Jason.encode_to_iodata!(adaptors)) - File.close(cache_file) - - adaptors - end - - # false positive, it's a file from init - # sobelow_skip ["Traversal.FileModule"] - defp read_from_cache(path) when is_binary(path) do - File.read(path) - |> case do - {:ok, file} -> - Logger.debug("Found Adapter Registry from #{path}") - Jason.decode!(file, keys: :atoms!) - - {:error, _} -> - nil + case Enum.into(opts, %{}) do + %{local_adaptors_repo: repo_path} when is_binary(repo_path) -> + adaptors = read_adaptors_from_local_repo(repo_path) + {:noreply, %{adaptors: adaptors, local_mode: true}} + + _other -> + # Non-local mode: all reads go through the DB/ETS cache. + # No need to fetch from NPM or read cache file on startup. + {:noreply, %{adaptors: [], local_mode: false}} end end @@ -146,10 +106,8 @@ defmodule Lightning.AdaptorRegistry do **Options** - - `:use_cache` (defaults to false) - stores the last set of results on disk - and uses the cached file for every subsequent start. - It can either be a boolean, or a string - the latter being a file path - to set where the cache file is located. + - `:use_cache` (defaults to false) - ignored in non-local mode (reads + come from DB/ETS cache now). Kept for backwards compatibility. - `:name` (defaults to AdaptorRegistry) - the name of the process, useful for testing and/or running multiple versions of the registry """ @@ -161,15 +119,37 @@ defmodule Lightning.AdaptorRegistry do GenServer.start_link(__MODULE__, opts, name: name) end + @impl GenServer + def handle_call(:all, _from, %{local_mode: true, adaptors: adaptors} = state) do + {:reply, adaptors, state} + end + @impl GenServer def handle_call(:all, _from, state) do - {:reply, state, state} + {:reply, get_adaptors_from_cache(), state} + end + + @impl GenServer + def handle_call( + {:versions_for, module_name}, + _from, + %{local_mode: true, adaptors: adaptors} = state + ) do + versions = + adaptors + |> Enum.find(fn %{name: name} -> name == module_name end) + |> case do + nil -> nil + %{versions: versions} -> versions + end + + {:reply, versions, state} end @impl GenServer def handle_call({:versions_for, module_name}, _from, state) do versions = - state + get_adaptors_from_cache() |> Enum.find(fn %{name: name} -> name == module_name end) |> case do nil -> nil @@ -179,10 +159,27 @@ defmodule Lightning.AdaptorRegistry do {:reply, versions, state} end + @impl GenServer + def handle_call( + {:latest_for, module_name}, + _from, + %{local_mode: true, adaptors: adaptors} = state + ) do + latest = + adaptors + |> Enum.find(fn %{name: name} -> name == module_name end) + |> case do + nil -> nil + %{latest: latest} -> latest + end + + {:reply, latest, state} + end + @impl GenServer def handle_call({:latest_for, module_name}, _from, state) do latest = - state + get_adaptors_from_cache() |> Enum.find(fn %{name: name} -> name == module_name end) |> case do nil -> nil @@ -192,6 +189,17 @@ defmodule Lightning.AdaptorRegistry do {:reply, latest, state} end + @impl GenServer + def handle_call(:refresh_sync, _from, %{local_mode: true} = state) do + {:reply, {:ok, :local_mode}, state} + end + + @impl GenServer + def handle_call(:refresh_sync, _from, state) do + result = do_refresh() + {:reply, result, state} + end + @doc """ Get the current in-process list of adaptors. This call will wait behind the `:continue` message when the process starts @@ -203,6 +211,29 @@ defmodule Lightning.AdaptorRegistry do GenServer.call(server, :all, @timeout) end + @doc """ + Fetches adaptor data from NPM and writes it to the database cache. + + Returns `{:ok, count}` on success or `{:error, reason}` on failure. + """ + @spec refresh(server :: GenServer.server()) :: + {:ok, term()} | {:error, term()} + def refresh(server \\ __MODULE__) do + GenServer.call(server, :refresh_sync, 60_000) + end + + @doc """ + Synchronous version of `refresh/1` that waits for the refresh to complete. + + Returns `{:ok, count}` with the number of adaptors fetched, or + `{:error, reason}` if the refresh failed. + """ + @spec refresh_sync(server :: GenServer.server()) :: + {:ok, term()} | {:error, term()} + def refresh_sync(server \\ __MODULE__) do + GenServer.call(server, :refresh_sync, 60_000) + end + @doc """ Get a list of versions for a given module. """ @@ -244,7 +275,14 @@ defmodule Lightning.AdaptorRegistry do max_concurrency: 10, timeout: @timeout ) - |> Stream.map(fn {:ok, detail} -> detail end) + |> Stream.flat_map(fn + {:ok, detail} -> + [detail] + + {:exit, reason} -> + Logger.warning("Failed to fetch adaptor details: #{inspect(reason)}") + [] + end) |> Enum.to_list() diff = DateTime.utc_now() |> DateTime.diff(start, :millisecond) @@ -286,6 +324,33 @@ defmodule Lightning.AdaptorRegistry do end) end + defp get_adaptors_from_cache do + case Lightning.AdaptorData.Cache.get("registry", "all") do + %{data: data} -> + Jason.decode!(data, keys: :atoms!) + + nil -> + [] + end + end + + defp do_refresh do + case fetch() do + [] -> + Logger.warning( + "Adaptor refresh returned empty results; keeping existing data" + ) + + {:error, :empty_results} + + adaptors -> + data = Jason.encode!(adaptors) + Lightning.AdaptorData.put("registry", "all", data) + Lightning.AdaptorData.Cache.broadcast_invalidation(["registry"]) + {:ok, length(adaptors)} + end + end + @doc """ Destructures an NPM style package name into module name and version. diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index b8eb925ab1b..9ccc1ae4bec 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -64,6 +64,8 @@ defmodule Lightning.Application do :ok = Oban.Telemetry.attach_default_logger(:debug) + Lightning.AdaptorData.Cache.init() + topologies = if System.get_env("K8S_HEADLESS_SERVICE") do [ @@ -135,6 +137,7 @@ defmodule Lightning.Application do LightningWeb.Endpoint, Lightning.Workflows.Presence, LightningWeb.WorkerPresence, + Lightning.AdaptorData.Listener, adaptor_registry_childspec, adaptor_service_childspec, {Lightning.TaskWorker, name: :cli_task_worker}, @@ -150,7 +153,21 @@ defmodule Lightning.Application do # See https://hexdocs.pm/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Lightning.Supervisor] - Supervisor.start_link(children, opts) + + with {:ok, pid} <- Supervisor.start_link(children, opts) do + schedule_adaptor_refresh() + {:ok, pid} + end + end + + defp schedule_adaptor_refresh do + unless Lightning.AdaptorRegistry.local_adaptors_enabled?() or + Lightning.Config.env() == :test do + Task.start(fn -> + Lightning.AdaptorRefreshWorker.new(%{}, schedule_in: 0) + |> then(&Oban.insert(Lightning.Oban, &1)) + end) + end end # Tell Phoenix to update the endpoint configuration diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index 11df8cadbee..6e0c174d1c1 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -210,6 +210,8 @@ defmodule Lightning.Config.Bootstrap do local_adaptors_repo: use_local_adaptors_repo? && Path.expand(local_adaptors_repo) + # schemas_path is only used by build-time mix tasks (install_schemas). + # At runtime, the DB is the primary source via AdaptorData/ETS cache. config :lightning, schemas_path: env!( @@ -226,6 +228,10 @@ defmodule Lightning.Config.Bootstrap do Utils.get_env([:lightning, :purge_deleted_after_days], 7) ) + config :lightning, + :adaptor_refresh_interval_hours, + env!("ADAPTOR_REFRESH_INTERVAL_HOURS", :integer, 0) + config :lightning, :activity_cleanup_chunk_size, env!( @@ -261,7 +267,23 @@ defmodule Lightning.Config.Bootstrap do ], else: [] - all_cron = base_cron ++ cleanup_cron + adaptor_refresh_cron = + case Application.get_env( + :lightning, + :adaptor_refresh_interval_hours, + 0 + ) do + hours when is_integer(hours) and hours >= 24 -> + [{"0 4 * * *", Lightning.AdaptorRefreshWorker}] + + hours when is_integer(hours) and hours > 0 -> + [{"0 */#{hours} * * *", Lightning.AdaptorRefreshWorker}] + + _disabled -> + [] + end + + all_cron = base_cron ++ cleanup_cron ++ adaptor_refresh_cron config :lightning, Oban, name: Lightning.Oban, diff --git a/lib/lightning/credential_schemas.ex b/lib/lightning/credential_schemas.ex new file mode 100644 index 00000000000..13b5615eff6 --- /dev/null +++ b/lib/lightning/credential_schemas.ex @@ -0,0 +1,224 @@ +defmodule Lightning.CredentialSchemas do + @moduledoc """ + Downloads and installs credential configuration schemas at runtime. + + Fetches the list of OpenFn adaptor packages from npm, then downloads + each adaptor's configuration schema from jsDelivr CDN. + """ + + require Logger + + @default_excluded_adaptors [ + "language-common", + "language-devtools", + "language-divoc" + ] + + @doc """ + Fetches credential schemas from npm/jsDelivr and writes them to the + configured schemas directory. + + Wipes and recreates the directory to ensure a clean install, then + downloads all matching schemas. + + Returns `{:ok, count}` on success or `{:error, reason}` on failure. + """ + @spec refresh(excluded :: [String.t()]) :: + {:ok, non_neg_integer()} | {:error, term()} + def refresh(excluded \\ @default_excluded_adaptors) do + {:ok, schemas_path} = Application.fetch_env(:lightning, :schemas_path) + + excluded_full = Enum.map(excluded, &"@openfn/#{&1}") + + case fetch_package_list() do + {:ok, packages} -> + tmp_dir = + Path.join(schemas_path, ".tmp_#{System.unique_integer([:positive])}") + + File.mkdir_p!(tmp_dir) + + results = + packages + |> Enum.filter(&Regex.match?(~r/@openfn\/language-\w+/, &1)) + |> Enum.reject(&(&1 in excluded_full)) + |> Task.async_stream( + &persist_schema(tmp_dir, &1), + ordered: false, + max_concurrency: 5, + timeout: 30_000 + ) + |> Enum.to_list() + + count = + Enum.count(results, fn + {:ok, :ok} -> true + _ -> false + end) + + # Only replace the existing schemas if we got at least one + if count > 0 do + schemas_path + |> File.ls!() + |> Enum.reject(&String.starts_with?(&1, ".tmp_")) + |> Enum.each(&File.rm!(Path.join(schemas_path, &1))) + + tmp_dir + |> File.ls!() + |> Enum.each(fn file -> + File.rename!(Path.join(tmp_dir, file), Path.join(schemas_path, file)) + end) + end + + File.rm_rf(tmp_dir) + + {:ok, count} + + {:error, reason} -> + {:error, reason} + end + rescue + error -> + Logger.error("Failed to refresh credential schemas: #{inspect(error)}") + + {:error, error} + end + + @doc """ + Parses CLI args to build the excluded adaptors list. + + If `args` starts with `["--exclude" | names]`, those names are merged + with the default exclusions. Otherwise the defaults are returned. + """ + @spec parse_excluded([String.t()]) :: [String.t()] + def parse_excluded(args) do + case args do + ["--exclude" | adaptor_names] when adaptor_names != [] -> + (adaptor_names ++ @default_excluded_adaptors) |> Enum.uniq() + + _ -> + @default_excluded_adaptors + end + end + + @doc """ + Fetches credential schemas from npm/jsDelivr and stores them in the + database via `Lightning.AdaptorData`. + + Used by the `AdaptorRefreshWorker` for DB-backed storage. + + Returns `{:ok, count}` on success or `{:error, reason}` on failure. + """ + @spec fetch_and_store(excluded :: [String.t()]) :: + {:ok, non_neg_integer()} | {:error, term()} + def fetch_and_store(excluded \\ @default_excluded_adaptors) do + excluded_full = Enum.map(excluded, &"@openfn/#{&1}") + + case fetch_package_list() do + {:ok, packages} -> + results = + packages + |> Enum.filter(&Regex.match?(~r/@openfn\/language-\w+/, &1)) + |> Enum.reject(&(&1 in excluded_full)) + |> Task.async_stream( + &fetch_schema/1, + ordered: false, + max_concurrency: 5, + timeout: 30_000 + ) + |> Enum.to_list() + + entries = + results + |> Enum.flat_map(fn + {:ok, {:ok, name, data}} -> + [%{key: name, data: data, content_type: "application/json"}] + + _ -> + [] + end) + + if entries != [] do + Lightning.AdaptorData.put_many("schema", entries) + end + + {:ok, length(entries)} + + {:error, reason} -> + {:error, reason} + end + rescue + error -> + Logger.error( + "Failed to fetch and store credential schemas: #{inspect(error)}" + ) + + {:error, error} + end + + defp fetch_schema(package_name) do + url = + "https://cdn.jsdelivr.net/npm/#{package_name}/configuration-schema.json" + + case HTTPoison.get(url, [], + hackney: [pool: :default], + recv_timeout: 15_000 + ) do + {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> + name = String.replace(package_name, "@openfn/language-", "") + {:ok, name, body} + + {:ok, %HTTPoison.Response{}} -> + :skipped + + {:error, _reason} -> + :error + end + end + + defp fetch_package_list do + case HTTPoison.get( + "https://registry.npmjs.org/-/user/openfn/package", + [], + hackney: [pool: :default], + recv_timeout: 15_000 + ) do + {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> + packages = body |> Jason.decode!() |> Map.keys() + {:ok, packages} + + {:ok, %HTTPoison.Response{status_code: status}} -> + {:error, "NPM returned #{status}"} + + {:error, %HTTPoison.Error{reason: reason}} -> + {:error, reason} + end + end + + defp persist_schema(dir, package_name) do + url = + "https://cdn.jsdelivr.net/npm/#{package_name}/configuration-schema.json" + + case HTTPoison.get(url, [], + hackney: [pool: :default], + recv_timeout: 15_000 + ) do + {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> + write_schema(dir, package_name, body) + :ok + + {:ok, %HTTPoison.Response{status_code: _status}} -> + :skipped + + {:error, _reason} -> + :error + end + end + + defp write_schema(dir, package_name, data) do + filename = + String.replace(package_name, "@openfn/language-", "") <> ".json" + + path = Path.join(dir, filename) + File.write!(path, data) + end +end diff --git a/lib/lightning/credentials.ex b/lib/lightning/credentials.ex index ed237b5d75c..9cbeaed4bf4 100644 --- a/lib/lightning/credentials.ex +++ b/lib/lightning/credentials.ex @@ -575,15 +575,21 @@ defmodule Lightning.Credentials do """ @spec get_schema(String.t()) :: Credentials.Schema.t() def get_schema(schema_name) do - {:ok, schemas_path} = Application.fetch_env(:lightning, :schemas_path) + case Lightning.AdaptorData.Cache.get("schema", schema_name) do + %{data: data} -> + Credentials.Schema.new(data, schema_name) - File.read("#{schemas_path}/#{schema_name}.json") - |> case do - {:ok, raw_json} -> - Credentials.Schema.new(raw_json, schema_name) + nil -> + # Fall back to filesystem for backwards compatibility + {:ok, schemas_path} = Application.fetch_env(:lightning, :schemas_path) - {:error, reason} -> - raise "Error reading credential schema. Got: #{reason |> inspect()}" + case File.read("#{schemas_path}/#{schema_name}.json") do + {:ok, raw_json} -> + Credentials.Schema.new(raw_json, schema_name) + + {:error, reason} -> + raise "Schema '#{schema_name}' not found in DB or filesystem: #{inspect(reason)}" + end end end diff --git a/lib/lightning/maintenance.ex b/lib/lightning/maintenance.ex new file mode 100644 index 00000000000..cc5adb4e923 --- /dev/null +++ b/lib/lightning/maintenance.ex @@ -0,0 +1,217 @@ +defmodule Lightning.Maintenance do + @moduledoc """ + Maintenance operations for the Lightning platform. + + Provides functions to install adaptor icons and credential schemas. + These are called by both mix tasks and the admin LiveView, so they + must work without Mix being available (e.g. in production releases). + """ + + require Logger + + @adaptors_tar_url "https://github.com/OpenFn/adaptors/archive/refs/heads/main.tar.gz" + + @default_excluded_adaptors [ + "@openfn/language-common", + "@openfn/language-devtools", + "@openfn/language-divoc" + ] + + # --------------------------------------------------------------------------- + # Adaptor Icons + # --------------------------------------------------------------------------- + + @doc """ + Downloads and installs adaptor icons from the OpenFn adaptors repository. + + Fetches a tar.gz archive, extracts it, copies PNG icons to the configured + `adaptor_icons_path`, and writes a JSON manifest file. + + Returns `{:ok, message}` on success or `{:error, reason}` on failure. + """ + @spec install_adaptor_icons() :: {:ok, String.t()} | {:error, String.t()} + def install_adaptor_icons do + target_dir = Application.get_env(:lightning, :adaptor_icons_path) + + try do + case File.mkdir_p(target_dir) do + :ok -> + :ok + + {:error, reason} -> + raise "Couldn't create the adaptors images directory: #{target_dir}, got :#{reason}." + end + + working_dir = tmp_dir!() + tar = fetch_body!(@adaptors_tar_url) + :ok = extract_tar!(tar, working_dir) + + adaptor_icons = save_icons(working_dir, target_dir) + manifest_path = Path.join(target_dir, "adaptor_icons.json") + File.write!(manifest_path, Jason.encode!(adaptor_icons)) + + {:ok, + "Adaptor icons installed successfully. #{map_size(adaptor_icons)} adaptors updated."} + rescue + e -> {:error, Exception.message(e)} + end + end + + defp adapter, do: Application.get_env(:tesla, __MODULE__, [])[:adapter] + + defp fetch_body!(url) do + client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter()) + Tesla.get!(client, url).body + end + + defp tmp_dir! do + dir = + Path.join([ + System.tmp_dir!(), + "lightning-adaptor", + "#{System.unique_integer([:positive])}" + ]) + + {:ok, _} = File.rm_rf(dir) + :ok = File.mkdir_p(dir) + dir + end + + defp extract_tar!(tar, working_dir) do + case :erl_tar.extract({:binary, tar}, [ + :compressed, + cwd: to_charlist(working_dir) + ]) do + :ok -> :ok + other -> raise "Couldn't unpack archive: #{inspect(other)}" + end + end + + defp save_icons(working_dir, target_dir) do + [working_dir, "**", "packages", "*", "assets", "{rectangle,square}.png"] + |> Path.join() + |> Path.wildcard() + |> Enum.map(fn icon_path -> + [icon_name, "assets", adapter_name | _rest] = + icon_path |> Path.split() |> Enum.reverse() + + dest_name = adapter_name <> "-" <> icon_name + File.cp!(icon_path, Path.join(target_dir, dest_name)) + + %{ + adaptor: adapter_name, + shape: Path.rootname(icon_name), + src: "/images/adaptors/#{dest_name}" + } + end) + |> Enum.group_by(& &1.adaptor) + |> Map.new(fn {adaptor, srcs} -> + {adaptor, Map.new(srcs, &{&1.shape, &1.src})} + end) + end + + # --------------------------------------------------------------------------- + # Credential Schemas + # --------------------------------------------------------------------------- + + @doc """ + Downloads and installs JSON credential schemas for OpenFn language packages. + + Queries the NPM registry for all `@openfn/language-*` packages, then + fetches each configuration schema from jsDelivr and saves them to the + configured `schemas_path`. Runs up to 5 concurrent downloads. + + `extra_excluded` is an optional list of bare adaptor names (e.g. + `["language-foo"]`) to exclude in addition to the defaults. + + Returns `{:ok, message}` on success or `{:error, reason}` on failure. + """ + @spec install_schemas(list(String.t())) :: + {:ok, String.t()} | {:error, String.t()} + def install_schemas(extra_excluded \\ []) do + dir = Application.get_env(:lightning, :schemas_path) + + excluded = + @default_excluded_adaptors ++ + Enum.map(extra_excluded, &"@openfn/#{&1}") + + try do + :ok = init_schema_dir!(dir) + + packages = fetch_openfn_packages!() + + result = + packages + |> Enum.reject(&(&1 in excluded)) + |> Task.async_stream(&persist_schema!(dir, &1), + ordered: false, + max_concurrency: 5, + timeout: 30_000 + ) + |> Enum.to_list() + + {:ok, "Schemas installation has finished. #{length(result)} installed."} + rescue + e -> {:error, Exception.message(e)} + end + end + + defp init_schema_dir!(nil), do: raise("Schema directory not provided.") + + defp init_schema_dir!(dir) do + File.rm_rf(dir) + + case File.mkdir_p(dir) do + :ok -> + :ok + + {:error, reason} -> + raise "Couldn't create the schemas directory: #{dir}, got :#{reason}." + end + end + + defp fetch_openfn_packages! do + case HTTPoison.get( + "https://registry.npmjs.org/-/user/openfn/package", + [], + hackney: [pool: :default], + recv_timeout: 15_000 + ) do + {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> + body + |> Jason.decode!() + |> Enum.map(fn {name, _} -> name end) + |> Enum.filter(&Regex.match?(~r/@openfn\/language-\w+/, &1)) + + {:ok, %HTTPoison.Response{status_code: status_code}} -> + raise "Unable to access openfn user packages. status=#{status_code}" + + {:error, _} -> + raise "Unable to connect to NPM; no adaptors fetched." + end + end + + defp persist_schema!(dir, package_name) do + url = + "https://cdn.jsdelivr.net/npm/#{package_name}/configuration-schema.json" + + case HTTPoison.get(url, [], hackney: [pool: :default], recv_timeout: 15_000) do + {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> + path = + Path.join( + dir, + String.replace(package_name, "@openfn/language-", "") <> ".json" + ) + + File.write!(path, body) + + {:ok, %HTTPoison.Response{status_code: status_code}} -> + Logger.warning( + "Unable to fetch #{package_name} configuration schema. status=#{status_code}" + ) + + {:error, _} -> + raise "Unable to access #{package_name}" + end + end +end diff --git a/lib/lightning_web/components/layouts/settings.html.heex b/lib/lightning_web/components/layouts/settings.html.heex index 93c629d184c..200269bd102 100644 --- a/lib/lightning_web/components/layouts/settings.html.heex +++ b/lib/lightning_web/components/layouts/settings.html.heex @@ -73,6 +73,13 @@ <.icon name="hero-circle-stack" class="h-5 w-5 shrink-0" /> Collections + + <.icon name="hero-wrench-screwdriver" class="h-5 w-5 shrink-0" /> + Maintenance +
<.icon name="hero-arrow-left" class="h-5 w-5 shrink-0" /> diff --git a/lib/lightning_web/controllers/adaptor_icon_controller.ex b/lib/lightning_web/controllers/adaptor_icon_controller.ex new file mode 100644 index 00000000000..c4c861a0947 --- /dev/null +++ b/lib/lightning_web/controllers/adaptor_icon_controller.ex @@ -0,0 +1,124 @@ +defmodule LightningWeb.AdaptorIconController do + @moduledoc """ + Serves adaptor icons and the icon manifest from the DB/ETS cache. + + Icons are fetched from GitHub on first request, then cached in DB + ETS. + Subsequent requests are served directly from ETS with appropriate + cache-control headers. + """ + use LightningWeb, :controller + + require Logger + + @github_base "https://raw.githubusercontent.com/OpenFn/adaptors/main/packages" + @icon_max_age 604_800 + @manifest_max_age 300 + + @doc """ + Serves an individual adaptor icon PNG. + + The filename is expected in the format `{adaptor}-{shape}.png` where + shape is "square" or "rectangle". The adaptor name may itself contain + hyphens (e.g., "google-sheets-square.png"). + """ + def show(conn, %{"icon" => filename}) do + case parse_icon_filename(filename) do + {:ok, adaptor, shape} -> + cache_key = "#{adaptor}-#{shape}" + + case Lightning.AdaptorData.Cache.get("icon", cache_key) do + %{data: data, content_type: content_type} -> + serve_icon(conn, data, content_type) + + nil -> + fetch_and_serve_icon(conn, adaptor, shape, cache_key) + end + + :error -> + send_resp(conn, 400, "Invalid icon filename") + end + end + + @doc """ + Serves the adaptor icon manifest JSON. + """ + def manifest(conn, _params) do + data = + case Lightning.AdaptorData.Cache.get("icon_manifest", "all") do + %{data: data} -> data + nil -> "{}" + end + + conn + |> put_resp_content_type("application/json") + |> put_resp_header("cache-control", "public, max-age=#{@manifest_max_age}") + |> send_resp(200, data) + end + + defp parse_icon_filename(filename) do + basename = Path.rootname(filename) + parts = String.split(basename, "-") + + case Enum.reverse(parts) do + [shape | rest] when shape in ["square", "rectangle"] and rest != [] -> + adaptor = rest |> Enum.reverse() |> Enum.join("-") + {:ok, adaptor, shape} + + _ -> + :error + end + end + + defp serve_icon(conn, data, content_type) do + conn + |> put_resp_content_type(content_type) + |> put_resp_header( + "cache-control", + "public, max-age=#{@icon_max_age}" + ) + |> send_resp(200, data) + end + + defp fetch_and_serve_icon(conn, adaptor, shape, cache_key) do + url = "#{@github_base}/#{adaptor}/assets/#{shape}.png" + + case Tesla.get(build_client(), url) do + {:ok, %{status: 200, body: body}} -> + Lightning.AdaptorData.put( + "icon", + cache_key, + body, + "image/png" + ) + + :ets.insert( + Lightning.AdaptorData.Cache, + {{"icon", cache_key}, %{data: body, content_type: "image/png"}} + ) + + serve_icon(conn, body, "image/png") + + {:ok, %{status: 404}} -> + send_resp(conn, 404, "Not Found") + + {:ok, %{status: status}} -> + Logger.warning( + "GitHub returned #{status} fetching icon for #{adaptor}/#{shape}" + ) + + send_resp(conn, 502, "Bad Gateway") + + {:error, reason} -> + Logger.warning( + "Failed to fetch icon for #{adaptor}/#{shape}: " <> + "#{inspect(reason)}" + ) + + send_resp(conn, 502, "Bad Gateway") + end + end + + defp build_client do + Tesla.client([Tesla.Middleware.FollowRedirects]) + end +end diff --git a/lib/lightning_web/endpoint.ex b/lib/lightning_web/endpoint.ex index 40add7babd5..863d51535ac 100644 --- a/lib/lightning_web/endpoint.ex +++ b/lib/lightning_web/endpoint.ex @@ -36,6 +36,10 @@ defmodule LightningWeb.Endpoint do ], longpoll: false + # Intercept adaptor icon requests before Plug.Static so that icons + # are always served from the DB/ETS cache rather than stale files. + plug Plugs.AdaptorIcons + # Serve at "/" the static files from "priv/static" directory. # # You should set gzip to true if you are running phx.digest diff --git a/lib/lightning_web/live/credential_live/credential_form_component.ex b/lib/lightning_web/live/credential_live/credential_form_component.ex index efa46406a98..9a8236a0b56 100644 --- a/lib/lightning_web/live/credential_live/credential_form_component.ex +++ b/lib/lightning_web/live/credential_live/credential_form_component.ex @@ -1161,20 +1161,23 @@ defmodule LightningWeb.CredentialLive.CredentialFormComponent do "Environment names organize credential configurations by deployment stage. When workflows run in sandbox projects (e.g., env: 'staging'), they automatically use the matching credential environment. Choose names that align with your project environments: 'production' for live systems, 'staging' for testing, 'development' for local work. Consistent naming ensures the right secrets are used in each environment." end - defp get_type_options(schemas_path) do - schemas_options = - Path.wildcard("#{schemas_path}/*.json") - |> Enum.map(fn p -> - name = p |> Path.basename() |> String.replace(".json", "") - - image_path = - Routes.static_path( - LightningWeb.Endpoint, - "/images/adaptors/#{name}-square.png" - ) + defp get_type_options do + schemas = Lightning.AdaptorData.Cache.get_all("schema") - {name, name, image_path, nil} - end) + schemas_options = + if schemas != [] do + Enum.map(schemas, fn %{key: name} -> + image_path = + Routes.static_path( + LightningWeb.Endpoint, + "/images/adaptors/#{name}-square.png" + ) + + {name, name, image_path, nil} + end) + else + get_type_options_from_filesystem() + end schemas_options |> Enum.reject(fn {_, name, _, _} -> @@ -1190,6 +1193,23 @@ defmodule LightningWeb.CredentialLive.CredentialFormComponent do |> Enum.sort_by(&String.downcase(elem(&1, 0)), :asc) end + defp get_type_options_from_filesystem do + {:ok, schemas_path} = Application.fetch_env(:lightning, :schemas_path) + + Path.wildcard("#{schemas_path}/*.json") + |> Enum.map(fn p -> + name = p |> Path.basename() |> String.replace(".json", "") + + image_path = + Routes.static_path( + LightningWeb.Endpoint, + "/images/adaptors/#{name}-square.png" + ) + + {name, name, image_path, nil} + end) + end + defp list_users do Lightning.Accounts.list_users() |> Enum.map(fn user -> @@ -1318,9 +1338,6 @@ defmodule LightningWeb.CredentialLive.CredentialFormComponent do type_options = if action == :new do - {:ok, schemas_path} = - Application.fetch_env(:lightning, :schemas_path) - keychain_option = if socket.assigns[:from_collab_editor] do [ @@ -1334,7 +1351,7 @@ defmodule LightningWeb.CredentialLive.CredentialFormComponent do [] end - get_type_options(schemas_path) + get_type_options() |> Enum.concat( Enum.map(oauth_clients, fn client -> {client.name, client.id, "/images/oauth-2.png", "oauth"} diff --git a/lib/lightning_web/live/maintenance_live/index.ex b/lib/lightning_web/live/maintenance_live/index.ex new file mode 100644 index 00000000000..0b007eef7d3 --- /dev/null +++ b/lib/lightning_web/live/maintenance_live/index.ex @@ -0,0 +1,150 @@ +defmodule LightningWeb.MaintenanceLive.Index do + @moduledoc """ + LiveView for the Settings > Maintenance page. + + Provides action buttons to refresh the adaptor registry, install adaptor + icons, and install credential schemas at runtime without restarting the app. + """ + use LightningWeb, :live_view + + alias Lightning.Policies.Permissions + alias Lightning.Policies.Users + + @actions [ + "refresh_adaptor_registry", + "install_adaptor_icons", + "install_schemas" + ] + + @impl true + def mount(_params, _session, socket) do + can_access_admin_space = + Users + |> Permissions.can?( + :access_admin_space, + socket.assigns.current_user, + {} + ) + + if can_access_admin_space do + {:ok, + socket + |> assign( + active_menu_item: :maintenance, + page_title: "Maintenance", + refresh_status: %{}, + running: MapSet.new() + ), layout: {LightningWeb.Layouts, :settings}} + else + {:ok, + put_flash(socket, :nav, :no_access) + |> push_navigate(to: "/projects")} + end + end + + @impl true + def handle_event("run_" <> action, _params, socket) + when action in @actions do + if superuser?(socket) do + pid = self() + + Task.start(fn -> + result = + try do + run_action(action) + rescue + error -> {:error, Exception.message(error)} + end + + send(pid, {:action_complete, action, result}) + end) + + {:noreply, + socket + |> update(:running, &MapSet.put(&1, action)) + |> put_in_status(action, :running)} + else + {:noreply, put_flash(socket, :error, "Unauthorized")} + end + end + + @impl true + def handle_info({:action_complete, action, result}, socket) do + status = + case result do + {:ok, _} -> :success + {:error, _} -> :error + end + + {:noreply, + socket + |> update(:running, &MapSet.delete(&1, action)) + |> put_in_status(action, status)} + end + + attr :action, :string, required: true + attr :title, :string, required: true + attr :description, :string, required: true + attr :running, :boolean, required: true + attr :status, :atom, default: nil + + defp maintenance_action(assigns) do + ~H""" +
+
+

{@title}

+

{@description}

+
+
+ + <.icon name="hero-check-circle-solid" class="h-5 w-5" /> Done + + + <.icon name="hero-x-circle-solid" class="h-5 w-5" /> Failed + + +
+
+ """ + end + + defp run_action("refresh_adaptor_registry") do + Lightning.AdaptorRegistry.refresh_sync() + end + + defp run_action("install_adaptor_icons") do + Lightning.AdaptorIcons.refresh() + end + + defp run_action("install_schemas") do + case Lightning.CredentialSchemas.fetch_and_store() do + {:ok, count} -> + Lightning.AdaptorData.Cache.broadcast_invalidation(["schema"]) + {:ok, count} + + error -> + error + end + end + + defp put_in_status(socket, action, status) do + update(socket, :refresh_status, &Map.put(&1, action, status)) + end + + defp superuser?(socket) do + Users + |> Permissions.can?( + :access_admin_space, + socket.assigns.current_user, + {} + ) + end +end diff --git a/lib/lightning_web/live/maintenance_live/index.html.heex b/lib/lightning_web/live/maintenance_live/index.html.heex new file mode 100644 index 00000000000..d8bdeddc8b6 --- /dev/null +++ b/lib/lightning_web/live/maintenance_live/index.html.heex @@ -0,0 +1,36 @@ + + <:header> + + <:title>Maintenance + + + +
+
+ <.maintenance_action + action="refresh_adaptor_registry" + title="Refresh Adaptor Registry" + description="Re-fetch the list of available adaptors and their versions from npm." + running={MapSet.member?(@running, "refresh_adaptor_registry")} + status={Map.get(@refresh_status, "refresh_adaptor_registry")} + /> + + <.maintenance_action + action="install_adaptor_icons" + title="Install Adaptor Icons" + description="Download adaptor icons from the OpenFn adaptors repository on GitHub." + running={MapSet.member?(@running, "install_adaptor_icons")} + status={Map.get(@refresh_status, "install_adaptor_icons")} + /> + + <.maintenance_action + action="install_schemas" + title="Install Credential Schemas" + description="Download credential configuration schemas from jsDelivr CDN." + running={MapSet.member?(@running, "install_schemas")} + status={Map.get(@refresh_status, "install_schemas")} + /> +
+
+
+
diff --git a/lib/lightning_web/plugs/adaptor_icons.ex b/lib/lightning_web/plugs/adaptor_icons.ex new file mode 100644 index 00000000000..c5fb9773fa4 --- /dev/null +++ b/lib/lightning_web/plugs/adaptor_icons.ex @@ -0,0 +1,45 @@ +defmodule LightningWeb.Plugs.AdaptorIcons do + @moduledoc """ + Intercepts requests for adaptor icons and the icon manifest before + Plug.Static can serve stale filesystem copies. + + Sits before Plug.Static in the endpoint plug pipeline. Matches + `GET /images/adaptors/*` and delegates to + `LightningWeb.AdaptorIconController`. + """ + @behaviour Plug + + import Plug.Conn + + @impl true + def init(opts), do: opts + + @impl true + def call( + %Plug.Conn{ + method: "GET", + path_info: ["images", "adaptors", "adaptor_icons.json"] + } = conn, + _opts + ) do + conn + |> put_private(:plug_skip_csrf_protection, true) + |> LightningWeb.AdaptorIconController.manifest(%{}) + |> halt() + end + + def call( + %Plug.Conn{ + method: "GET", + path_info: ["images", "adaptors", icon] + } = conn, + _opts + ) do + conn + |> put_private(:plug_skip_csrf_protection, true) + |> LightningWeb.AdaptorIconController.show(%{"icon" => icon}) + |> halt() + end + + def call(conn, _opts), do: conn +end diff --git a/lib/lightning_web/router.ex b/lib/lightning_web/router.ex index 1b3ac330598..5fc34485375 100644 --- a/lib/lightning_web/router.ex +++ b/lib/lightning_web/router.ex @@ -223,6 +223,8 @@ defmodule LightningWeb.Router do live "/settings/authentication/new", AuthProvidersLive.Index, :new live "/settings/collections", CollectionLive.Index, :index + + live "/settings/maintenance", MaintenanceLive.Index, :index end live_session :default, on_mount: LightningWeb.InitAssigns do diff --git a/lib/mix/tasks/install_adaptor_icons.ex b/lib/mix/tasks/install_adaptor_icons.ex index 6ac0f795bdb..c5613394322 100644 --- a/lib/mix/tasks/install_adaptor_icons.ex +++ b/lib/mix/tasks/install_adaptor_icons.ex @@ -1,101 +1,29 @@ defmodule Mix.Tasks.Lightning.InstallAdaptorIcons do @moduledoc """ - Installs the adaptor icons - """ - use Mix.Task + Installs the adaptor icons. - @adaptors_tar_url "https://github.com/OpenFn/adaptors/archive/refs/heads/main.tar.gz" + Refreshes the icon manifest from the adaptor registry and optionally + prefetches icon PNGs from GitHub into the database cache. - defp adapter do - Application.get_env(:tesla, __MODULE__, [])[:adapter] - end + All core logic lives in `Lightning.AdaptorIcons`; this task only + handles application startup and CLI output. + """ - @target_dir Application.compile_env(:lightning, :adaptor_icons_path) + use Mix.Task @impl true def run(_) do - Application.ensure_all_started(:telemetry) - Finch.start_link(name: Lightning.Finch) + Mix.Task.run("app.start") - File.mkdir_p(@target_dir) - |> case do - {:error, reason} -> - raise "Couldn't create the adaptors images directory: #{@target_dir}, got :#{reason}." + case Lightning.AdaptorIcons.refresh() do + {:ok, manifest} -> + Mix.shell().info( + "Adaptor icons refreshed successfully. " <> + "#{map_size(manifest)} adaptors in manifest." + ) - :ok -> - :ok - end - - working_dir = tmp_dir!() - tar = fetch_body!(@adaptors_tar_url) - - case :erl_tar.extract({:binary, tar}, [ - :compressed, - cwd: to_charlist(working_dir) - ]) do - :ok -> :ok - other -> raise "couldn't unpack archive: #{inspect(other)}" + {:error, reason} -> + Mix.raise("Adaptor icons refresh failed: #{inspect(reason)}") end - - adaptor_icons = save_icons(working_dir) - manifest_path = Path.join(@target_dir, "adaptor_icons.json") - :ok = File.write(manifest_path, Jason.encode!(adaptor_icons)) - - Mix.shell().info( - "Adaptor icons installed successfully. Manifest saved at: #{manifest_path}" - ) - end - - defp fetch_body!(url) do - response = Tesla.get!(build_client(), url) - response.body - end - - defp build_client do - Tesla.client([Tesla.Middleware.FollowRedirects], adapter()) - end - - defp tmp_dir! do - tmp_dir = - Path.join([ - System.tmp_dir!(), - "lightning-adaptor", - "#{System.unique_integer([:positive])}" - ]) - - {:ok, _} = File.rm_rf(tmp_dir) - :ok = File.mkdir_p(tmp_dir) - - tmp_dir - end - - defp list_icons(working_dir) do - [working_dir, "**", "packages", "*", "assets", "{rectangle,square}.png"] - |> Path.join() - |> Path.wildcard() - end - - defp save_icons(working_dir) do - working_dir - |> list_icons() - |> Enum.map(fn icon_path -> - [icon_name, "assets", adapter_name | _rest] = - Path.split(icon_path) |> Enum.reverse() - - destination_name = adapter_name <> "-" <> icon_name - destination_path = Path.join(@target_dir, destination_name) - File.cp!(icon_path, destination_path) - - %{ - adaptor: adapter_name, - shape: Path.rootname(icon_name), - src: "/images/adaptors" <> "/#{destination_name}" - } - end) - |> Enum.group_by(fn entry -> entry.adaptor end) - |> Enum.into(%{}, fn {adaptor, sources} -> - sources = Map.new(sources, fn entry -> {entry.shape, entry.src} end) - {adaptor, sources} - end) end end diff --git a/lib/mix/tasks/install_schemas.ex b/lib/mix/tasks/install_schemas.ex index c37dae736fb..cc4c757abe9 100644 --- a/lib/mix/tasks/install_schemas.ex +++ b/lib/mix/tasks/install_schemas.ex @@ -2,132 +2,50 @@ defmodule Mix.Tasks.Lightning.InstallSchemas do @shortdoc "Install the credential json schemas" @moduledoc """ - Install the credential json schemas - Use --exclude language-package1, language-package2 to exclude specific packages - """ - - use Mix.Task - use HTTPoison.Base - require Logger - - @default_excluded_adaptors [ - "language-common", - "language-devtools", - "language-divoc" - ] - - @spec run(any) :: any - def run(args) do - HTTPoison.start() + Install the credential json schemas. - dir = schemas_path() + ## Modes - init_schema_dir(dir) + - Default (filesystem): writes schemas to `schemas_path` for baking into + Docker images at build time. + - `--db`: starts the app and writes schemas directly to the database via + `Lightning.AdaptorData`. Useful for seeding a fresh database. - result = - args - |> parse_excluded() - |> fetch_schemas(&persist_schema(dir, &1)) - |> Enum.to_list() + Use `--exclude language-package1 language-package2` to exclude specific + packages. - Mix.shell().info( - "Schemas installation has finished. #{length(result)} installed" - ) - end + All core logic lives in `Lightning.CredentialSchemas`; this task only + handles HTTP startup and CLI output. + """ - def parse_excluded(args) do - args - |> case do - ["--exclude" | adaptor_names] when adaptor_names != [] -> - (adaptor_names ++ @default_excluded_adaptors) |> Enum.uniq() + use Mix.Task - _ -> - @default_excluded_adaptors - end - end + @impl true + def run(["--db" | rest]) do + Mix.Task.run("app.start") - defp schemas_path do - Application.get_env(:lightning, :schemas_path) - end + excluded = Lightning.CredentialSchemas.parse_excluded(rest) - defp init_schema_dir(dir) do - if is_nil(dir), do: raise("Schema directory not provided.") - File.rm_rf(dir) + case Lightning.CredentialSchemas.fetch_and_store(excluded) do + {:ok, count} -> + Mix.shell().info("Schemas stored in database. #{count} installed") - File.mkdir_p(dir) - |> case do {:error, reason} -> - raise "Couldn't create the schemas directory: #{dir}, got :#{reason}." - - _ -> - nil + Mix.raise("Schema installation (DB) failed: #{inspect(reason)}") end end - def write_schema(dir, package_name, data) when is_binary(package_name) do - path = - Path.join([ - dir, - String.replace(package_name, "@openfn/language-", "") <> ".json" - ]) - - file = File.open!(path, [:write]) - - IO.binwrite(file, data) - File.close(file) - end - - def persist_schema(dir, package_name) do - get( - "https://cdn.jsdelivr.net/npm/#{package_name}/configuration-schema.json", - [], - hackney: [pool: :default], - recv_timeout: 15_000 - ) - |> case do - {:error, _} -> - raise "Unable to access #{package_name}" - - {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> - write_schema(dir, package_name, body) - - {:ok, %HTTPoison.Response{status_code: status_code}} -> - Logger.warning( - "Unable to fetch #{package_name} configuration schema. status=#{status_code}" - ) - end - end - - def fetch_schemas(excluded \\ [], fun) do - get("https://registry.npmjs.org/-/user/openfn/package", [], - hackney: [pool: :default], - recv_timeout: 15_000 - ) - |> case do - {:error, %HTTPoison.Error{}} -> - raise "Unable to connect to NPM; no adaptors fetched." + def run(args) do + HTTPoison.start() - {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> - excluded = excluded |> Enum.map(&"@openfn/#{&1}") + excluded = Lightning.CredentialSchemas.parse_excluded(args) - body - |> Jason.decode!() - |> Enum.map(fn {name, _} -> name end) - |> Enum.filter(fn name -> - Regex.match?(~r/@openfn\/language-\w+/, name) - end) - |> Enum.reject(fn name -> - name in excluded - end) - |> Task.async_stream(fun, - ordered: false, - max_concurrency: 5, - timeout: 30_000 - ) - |> Stream.map(fn {:ok, detail} -> detail end) + case Lightning.CredentialSchemas.refresh(excluded) do + {:ok, count} -> + Mix.shell().info("Schemas installation has finished. #{count} installed") - {:ok, %HTTPoison.Response{status_code: status_code}} -> - raise "Unable to access openfn user packages. status=#{status_code}" + {:error, reason} -> + Mix.raise("Schema installation failed: #{inspect(reason)}") end end end diff --git a/priv/repo/migrations/20260308204728_create_adaptor_cache_entries.exs b/priv/repo/migrations/20260308204728_create_adaptor_cache_entries.exs new file mode 100644 index 00000000000..84e069ae24e --- /dev/null +++ b/priv/repo/migrations/20260308204728_create_adaptor_cache_entries.exs @@ -0,0 +1,18 @@ +defmodule Lightning.Repo.Migrations.CreateAdaptorCacheEntries do + use Ecto.Migration + + def change do + create table(:adaptor_cache_entries, primary_key: false) do + add :id, :binary_id, primary_key: true + add :kind, :string, null: false + add :key, :string, null: false + add :data, :binary, null: false + add :content_type, :string, default: "application/json" + + timestamps(type: :utc_datetime_usec) + end + + create unique_index(:adaptor_cache_entries, [:kind, :key]) + create index(:adaptor_cache_entries, [:kind]) + end +end diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs index f6af34b6df9..570dea3ee24 100644 --- a/test/integration/web_and_worker_test.exs +++ b/test/integration/web_and_worker_test.exs @@ -266,7 +266,7 @@ defmodule Lightning.WebAndWorkerTest do end version_logs = pick_out_version_logs(run) - assert version_logs["@openfn/language-http"] =~ "3.1.12" + assert version_logs["@openfn/language-http"] =~ "7.2.0" assert version_logs["worker"] =~ "1.17" assert version_logs["node.js"] =~ "22.12" assert version_logs["@openfn/language-common"] == "3.0.2" diff --git a/test/lightning/adaptor_data/cache_test.exs b/test/lightning/adaptor_data/cache_test.exs new file mode 100644 index 00000000000..5f666af862f --- /dev/null +++ b/test/lightning/adaptor_data/cache_test.exs @@ -0,0 +1,157 @@ +defmodule Lightning.AdaptorData.CacheTest do + use Lightning.DataCase, async: true + + alias Lightning.AdaptorData + alias Lightning.AdaptorData.Cache + + # Each test gets its own kind to avoid cross-test ETS collisions + defp unique_kind, do: "test_kind_#{System.unique_integer([:positive])}" + + setup do + # Ensure ETS is clean for our test kinds + :ok + end + + describe "get/2" do + test "returns nil when entry does not exist in DB or ETS" do + kind = unique_kind() + assert Cache.get(kind, "missing") == nil + end + + test "falls back to DB on cache miss and populates ETS" do + kind = unique_kind() + {:ok, _entry} = AdaptorData.put(kind, "key1", "some data", "text/plain") + + # First call: ETS miss, DB hit, populates ETS + result = Cache.get(kind, "key1") + assert %{data: "some data", content_type: "text/plain"} = result + + # Second call: ETS hit (verify by checking ETS directly) + assert [{_key, ^result}] = + :ets.lookup(Cache, {kind, "key1"}) + end + + test "returns cached value from ETS on subsequent calls" do + kind = unique_kind() + {:ok, _entry} = AdaptorData.put(kind, "key2", ~s({"a":1})) + + # Populate ETS + first = Cache.get(kind, "key2") + assert %{data: ~s({"a":1}), content_type: "application/json"} = first + + # Update DB directly (bypassing cache) + {:ok, _entry} = AdaptorData.put(kind, "key2", ~s({"a":2})) + + # ETS still returns stale value (proving it reads from ETS) + assert Cache.get(kind, "key2") == first + end + end + + describe "get_all/1" do + test "returns empty list when no entries exist" do + kind = unique_kind() + assert Cache.get_all(kind) == [] + end + + test "falls back to DB and populates ETS with mapped entries" do + kind = unique_kind() + {:ok, _} = AdaptorData.put(kind, "a", "data_a", "text/plain") + {:ok, _} = AdaptorData.put(kind, "b", "data_b", "application/json") + + result = Cache.get_all(kind) + + assert [ + %{key: "a", data: "data_a", content_type: "text/plain"}, + %{key: "b", data: "data_b", content_type: "application/json"} + ] = result + + # Verify ETS was populated with the :__all__ key + assert [{_key, ^result}] = :ets.lookup(Cache, {kind, :__all__}) + end + + test "returns cached list on subsequent calls" do + kind = unique_kind() + {:ok, _} = AdaptorData.put(kind, "x", "data_x") + + first = Cache.get_all(kind) + assert length(first) == 1 + + # Add another entry to DB (bypassing cache) + {:ok, _} = AdaptorData.put(kind, "y", "data_y") + + # ETS still returns the original list + assert Cache.get_all(kind) == first + end + end + + describe "invalidate/1" do + test "clears ETS entries for a kind so next read goes to DB" do + kind = unique_kind() + {:ok, _} = AdaptorData.put(kind, "k1", "original") + + # Populate ETS via read + assert %{data: "original"} = Cache.get(kind, "k1") + assert [%{key: "k1"}] = Cache.get_all(kind) + + # Update DB + {:ok, _} = AdaptorData.put(kind, "k1", "updated") + + # Invalidate + assert :ok = Cache.invalidate(kind) + + # Next read goes to DB and gets updated value + assert %{data: "updated"} = Cache.get(kind, "k1") + end + + test "does not affect entries of other kinds" do + kind1 = unique_kind() + kind2 = unique_kind() + + {:ok, _} = AdaptorData.put(kind1, "k", "data1") + {:ok, _} = AdaptorData.put(kind2, "k", "data2") + + # Populate both in ETS + Cache.get(kind1, "k") + Cache.get(kind2, "k") + + # Invalidate only kind1 + Cache.invalidate(kind1) + + # kind2 still cached (stale check: update DB, ETS should still have old) + {:ok, _} = AdaptorData.put(kind2, "k", "data2_updated") + assert %{data: "data2"} = Cache.get(kind2, "k") + end + end + + describe "invalidate_all/0" do + test "clears all ETS entries" do + kind1 = unique_kind() + kind2 = unique_kind() + + {:ok, _} = AdaptorData.put(kind1, "k", "d1") + {:ok, _} = AdaptorData.put(kind2, "k", "d2") + + Cache.get(kind1, "k") + Cache.get(kind2, "k") + + assert :ok = Cache.invalidate_all() + + # Update DB so we can verify reads go to DB + {:ok, _} = AdaptorData.put(kind1, "k", "d1_new") + {:ok, _} = AdaptorData.put(kind2, "k", "d2_new") + + assert %{data: "d1_new"} = Cache.get(kind1, "k") + assert %{data: "d2_new"} = Cache.get(kind2, "k") + end + end + + describe "broadcast_invalidation/1" do + test "broadcasts invalidation message via PubSub" do + Lightning.API.subscribe("adaptor:data") + + Cache.broadcast_invalidation(["registry", "schema"]) + + assert_receive {:invalidate_cache, ["registry", "schema"], _node} + end + end +end diff --git a/test/lightning/adaptor_data/listener_test.exs b/test/lightning/adaptor_data/listener_test.exs new file mode 100644 index 00000000000..6875450a087 --- /dev/null +++ b/test/lightning/adaptor_data/listener_test.exs @@ -0,0 +1,57 @@ +defmodule Lightning.AdaptorData.ListenerTest do + use Lightning.DataCase, async: true + + alias Lightning.AdaptorData + alias Lightning.AdaptorData.Cache + alias Lightning.AdaptorData.Listener + + defp unique_kind, do: "listener_kind_#{System.unique_integer([:positive])}" + + describe "handle_info/2 {:invalidate_cache, ...}" do + test "invalidates ETS cache for each kind when receiving PubSub message" do + kind1 = unique_kind() + kind2 = unique_kind() + + # Put data in DB and populate ETS via Cache.get + {:ok, _} = AdaptorData.put(kind1, "k", "original1") + {:ok, _} = AdaptorData.put(kind2, "k", "original2") + Cache.get(kind1, "k") + Cache.get(kind2, "k") + + # Simulate the PubSub message the Listener would receive + send(Listener, {:invalidate_cache, [kind1, kind2], node()}) + + # Give the GenServer time to process + # We verify by checking ETS is empty for those keys + :sys.get_state(Listener) + + # Update DB so we can verify reads go to DB + {:ok, _} = AdaptorData.put(kind1, "k", "updated1") + {:ok, _} = AdaptorData.put(kind2, "k", "updated2") + + assert %{data: "updated1"} = Cache.get(kind1, "k") + assert %{data: "updated2"} = Cache.get(kind2, "k") + end + end + + describe "integration with PubSub broadcast" do + test "end-to-end: broadcast triggers listener to invalidate cache" do + kind = unique_kind() + + {:ok, _} = AdaptorData.put(kind, "key", "before") + Cache.get(kind, "key") + + # Update DB + {:ok, _} = AdaptorData.put(kind, "key", "after") + + # Broadcast invalidation (Listener is subscribed) + Cache.broadcast_invalidation([kind]) + + # Wait for Listener to process + :sys.get_state(Listener) + + # Cache should now read from DB + assert %{data: "after"} = Cache.get(kind, "key") + end + end +end diff --git a/test/lightning/adaptor_data_test.exs b/test/lightning/adaptor_data_test.exs new file mode 100644 index 00000000000..4099e3321d2 --- /dev/null +++ b/test/lightning/adaptor_data_test.exs @@ -0,0 +1,109 @@ +defmodule Lightning.AdaptorDataTest do + use Lightning.DataCase, async: true + + alias Lightning.AdaptorData + alias Lightning.AdaptorData.CacheEntry + + describe "put/4 and get/2" do + test "inserts and retrieves a cache entry" do + assert {:ok, entry} = + AdaptorData.put("registry", "adaptors", ~s({"list":[]})) + + assert %CacheEntry{ + kind: "registry", + key: "adaptors", + data: ~s({"list":[]}), + content_type: "application/json" + } = entry + + assert {:ok, fetched} = AdaptorData.get("registry", "adaptors") + assert fetched.id == entry.id + end + + test "upserts on conflict, replacing data and content_type" do + assert {:ok, original} = + AdaptorData.put("schema", "http", "v1", "application/json") + + assert {:ok, updated} = + AdaptorData.put("schema", "http", "v2", "text/plain") + + assert updated.id == original.id + assert updated.data == "v2" + assert updated.content_type == "text/plain" + end + + test "returns error for missing entry" do + assert {:error, :not_found} = AdaptorData.get("nope", "nope") + end + end + + describe "put_many/2" do + test "bulk inserts and upserts entries" do + entries = [ + %{key: "a", data: "data_a"}, + %{key: "b", data: "data_b", content_type: "image/png"} + ] + + assert {2, _} = AdaptorData.put_many("icons", entries) + + all = AdaptorData.get_all("icons") + assert length(all) == 2 + + assert %CacheEntry{key: "a", data: "data_a"} = + Enum.find(all, &(&1.key == "a")) + + assert %CacheEntry{key: "b", content_type: "image/png"} = + Enum.find(all, &(&1.key == "b")) + + # Upsert overwrites existing entries + assert {1, _} = + AdaptorData.put_many("icons", [ + %{key: "a", data: "data_a_v2"} + ]) + + assert {:ok, %CacheEntry{data: "data_a_v2"}} = + AdaptorData.get("icons", "a") + end + end + + describe "get_all/1" do + test "returns entries ordered by key and scoped to kind" do + AdaptorData.put("reg", "z-adaptor", "z") + AdaptorData.put("reg", "a-adaptor", "a") + AdaptorData.put("other", "should-not-appear", "x") + + entries = AdaptorData.get_all("reg") + assert length(entries) == 2 + assert [%{key: "a-adaptor"}, %{key: "z-adaptor"}] = entries + end + end + + describe "delete_kind/1" do + test "removes all entries for a kind" do + AdaptorData.put("temp", "one", "1") + AdaptorData.put("temp", "two", "2") + AdaptorData.put("keep", "three", "3") + + assert {2, _} = AdaptorData.delete_kind("temp") + assert AdaptorData.get_all("temp") == [] + assert length(AdaptorData.get_all("keep")) == 1 + end + end + + describe "delete/2" do + test "deletes a specific entry and returns it" do + AdaptorData.put("kind", "target", "data") + AdaptorData.put("kind", "keep", "data") + + assert {:ok, %CacheEntry{key: "target"}} = + AdaptorData.delete("kind", "target") + + assert {:error, :not_found} = AdaptorData.get("kind", "target") + assert {:ok, _} = AdaptorData.get("kind", "keep") + end + + test "returns error when entry does not exist" do + assert {:error, :not_found} = AdaptorData.delete("nope", "nope") + end + end +end diff --git a/test/lightning/adaptor_icons_test.exs b/test/lightning/adaptor_icons_test.exs new file mode 100644 index 00000000000..554b58401ee --- /dev/null +++ b/test/lightning/adaptor_icons_test.exs @@ -0,0 +1,93 @@ +defmodule Lightning.AdaptorIconsTest do + use Lightning.DataCase, async: false + + import Mox + + setup :set_mox_from_context + setup :verify_on_exit! + + setup do + Lightning.AdaptorData.Cache.invalidate("icon") + Lightning.AdaptorData.Cache.invalidate("icon_manifest") + :ok + end + + describe "refresh_manifest/0" do + test "builds manifest from adaptor registry and stores in DB" do + assert {:ok, manifest} = Lightning.AdaptorIcons.refresh_manifest() + + assert is_map(manifest) + + # The manifest should contain entries based on whatever adaptors + # are in the registry cache + if map_size(manifest) > 0 do + {_name, sources} = Enum.at(manifest, 0) + assert Map.has_key?(sources, "square") + assert Map.has_key?(sources, "rectangle") + end + + # Verify it was stored in DB + assert {:ok, entry} = + Lightning.AdaptorData.get("icon_manifest", "all") + + assert entry.content_type == "application/json" + assert Jason.decode!(entry.data) == manifest + end + end + + describe "refresh/0" do + test "returns manifest and spawns background prefetch" do + assert {:ok, manifest} = Lightning.AdaptorIcons.refresh() + assert is_map(manifest) + end + end + + describe "prefetch_icons/1" do + test "skips icons already in DB" do + Lightning.AdaptorData.put( + "icon", + "http-square", + <<1, 2, 3>>, + "image/png" + ) + + # No Tesla call should be made for http-square + Lightning.AdaptorIcons.prefetch_icons(%{ + "http" => %{ + "square" => "/images/adaptors/http-square.png", + "rectangle" => "/images/adaptors/http-rectangle.png" + } + }) + + # The rectangle one would have attempted a fetch (via Hackney stub) + # but the square one was skipped + assert {:ok, _} = Lightning.AdaptorData.get("icon", "http-square") + end + + test "fetches and stores missing icons from GitHub" do + png_data = <<137, 80, 78, 71, 13, 10, 26, 10>> + + Mox.expect(Lightning.Tesla.Mock, :call, 2, fn env, _opts -> + assert env.url =~ "raw.githubusercontent.com/OpenFn/adaptors" + {:ok, %Tesla.Env{status: 200, body: png_data}} + end) + + Lightning.AdaptorIcons.prefetch_icons(%{ + "testadaptor" => %{ + "square" => "/images/adaptors/testadaptor-square.png", + "rectangle" => "/images/adaptors/testadaptor-rectangle.png" + } + }) + + assert {:ok, sq} = + Lightning.AdaptorData.get("icon", "testadaptor-square") + + assert sq.data == png_data + + assert {:ok, rect} = + Lightning.AdaptorData.get("icon", "testadaptor-rectangle") + + assert rect.data == png_data + end + end +end diff --git a/test/lightning/adaptor_refresh_worker_test.exs b/test/lightning/adaptor_refresh_worker_test.exs new file mode 100644 index 00000000000..73eaec9018c --- /dev/null +++ b/test/lightning/adaptor_refresh_worker_test.exs @@ -0,0 +1,159 @@ +defmodule Lightning.AdaptorRefreshWorkerTest do + use Lightning.DataCase, async: false + + import Mock + import Mox + + setup :verify_on_exit! + + alias Lightning.AdaptorRefreshWorker + + describe "perform/1" do + test "skips refresh when local adaptors mode is enabled" do + stub(Lightning.MockConfig, :adaptor_registry, fn -> + [local_adaptors_repo: "/tmp/fake-adaptors"] + end) + + assert :ok = AdaptorRefreshWorker.perform(%Oban.Job{}) + end + + test "writes registry and schema data to DB on success" do + stub(Lightning.MockConfig, :adaptor_registry, fn -> [] end) + + # Tesla mock for AdaptorRegistry.fetch() — NPM user packages + details + stub(Lightning.Tesla.Mock, :call, fn env, _opts -> + cond do + String.contains?(env.url, "/-/user/openfn/package") -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{"@openfn/language-http" => "read"} + }} + + String.contains?(env.url, "language-http") -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "name" => "@openfn/language-http", + "repository" => %{"url" => "https://github.com/openfn/adaptors"}, + "dist-tags" => %{"latest" => "1.0.0"}, + "versions" => %{"1.0.0" => %{}} + } + }} + + true -> + {:ok, %Tesla.Env{status: 200, body: %{}}} + end + end) + + # HTTPoison mock for CredentialSchemas.fetch_and_store() + with_mock HTTPoison, + get: fn url, _headers, _opts -> + cond do + String.contains?(url, "registry.npmjs.org") -> + body = + Jason.encode!(%{ + "@openfn/language-http" => "read", + "@openfn/language-common" => "read" + }) + + {:ok, %HTTPoison.Response{status_code: 200, body: body}} + + String.contains?(url, "cdn.jsdelivr.net") -> + {:ok, + %HTTPoison.Response{ + status_code: 200, + body: ~s({"fields": []}) + }} + + true -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + end + end do + assert :ok = AdaptorRefreshWorker.perform(%Oban.Job{}) + + # Verify registry was written to DB + assert {:ok, entry} = Lightning.AdaptorData.get("registry", "all") + assert entry.content_type == "application/json" + assert is_binary(entry.data) + + # Verify at least one schema was written to DB + schemas = Lightning.AdaptorData.get_all("schema") + assert length(schemas) > 0 + end + end + + test "returns :ok even when all HTTP calls fail" do + stub(Lightning.MockConfig, :adaptor_registry, fn -> [] end) + + stub(Lightning.Tesla.Mock, :call, fn _env, _opts -> + {:error, :econnrefused} + end) + + with_mock HTTPoison, + get: fn _url, _headers, _opts -> + {:error, %HTTPoison.Error{reason: :econnrefused}} + end do + assert :ok = AdaptorRefreshWorker.perform(%Oban.Job{}) + end + end + + test "handles partial failure — broadcasts only successful kinds" do + stub(Lightning.MockConfig, :adaptor_registry, fn -> [] end) + + # Registry fetch succeeds via Tesla + stub(Lightning.Tesla.Mock, :call, fn env, _opts -> + cond do + String.contains?(env.url, "/-/user/openfn/package") -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{"@openfn/language-http" => "read"} + }} + + String.contains?(env.url, "language-http") -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "name" => "@openfn/language-http", + "repository" => %{"url" => "https://github.com/openfn/adaptors"}, + "dist-tags" => %{"latest" => "1.0.0"}, + "versions" => %{"1.0.0" => %{}} + } + }} + + true -> + {:ok, %Tesla.Env{status: 200, body: %{}}} + end + end) + + # Schema fetch fails via HTTPoison + with_mock HTTPoison, + get: fn _url, _headers, _opts -> + {:error, %HTTPoison.Error{reason: :econnrefused}} + end do + assert :ok = AdaptorRefreshWorker.perform(%Oban.Job{}) + + # Registry should still be written + assert {:ok, _entry} = Lightning.AdaptorData.get("registry", "all") + end + end + + test "safe_call rescues exceptions and returns :ok" do + stub(Lightning.MockConfig, :adaptor_registry, fn -> [] end) + + stub(Lightning.Tesla.Mock, :call, fn _env, _opts -> + raise "unexpected failure" + end) + + with_mock HTTPoison, + get: fn _url, _headers, _opts -> + raise "unexpected failure" + end do + assert :ok = AdaptorRefreshWorker.perform(%Oban.Job{}) + end + end + end +end diff --git a/test/lightning/adaptor_registry_test.exs b/test/lightning/adaptor_registry_test.exs index b590f010c22..c170c0759c1 100644 --- a/test/lightning/adaptor_registry_test.exs +++ b/test/lightning/adaptor_registry_test.exs @@ -9,129 +9,105 @@ defmodule Lightning.AdaptorRegistryTest do alias Lightning.AdaptorRegistry - describe "start_link/1" do - test "uses cache from a specific location" do - file_path = - Briefly.create!(extname: ".json") - |> tap(fn path -> - File.write!(path, ~S""" - [{ - "latest": "3.0.5", - "name": "@openfn/language-dhis2", - "repo": "git+https://github.com/openfn/language-dhis2.git", - "versions": [] - }] - """) - end) + describe "start_link/1 in non-local mode" do + test "reads from DB/ETS cache when data is seeded" do + # Seed DB with adaptor data + adaptors = [ + %{ + name: "@openfn/language-dhis2", + repo: "git+https://github.com/openfn/language-dhis2.git", + latest: "3.0.5", + versions: [] + } + ] + + Lightning.AdaptorData.put("registry", "all", Jason.encode!(adaptors)) + Lightning.AdaptorData.Cache.invalidate("registry") start_supervised!( - {AdaptorRegistry, [name: :test_adaptor_registry, use_cache: file_path]} + {AdaptorRegistry, [name: :test_adaptor_registry, use_cache: false]} ) results = AdaptorRegistry.all(:test_adaptor_registry) assert length(results) == 1 - end - - test "retrieves a list of adaptors when caching is disabled" do - default_npm_response = - File.read!("test/fixtures/language-common-npm.json") |> Jason.decode!() - - expect_tesla_call( - times: 7, - returns: fn env, [] -> - case env.url do - "https://registry.npmjs.org/-/user/openfn/package" -> - {:ok, - json( - %Tesla.Env{status: 200}, - File.read!("test/fixtures/openfn-packages-npm.json") - |> Jason.decode!() - )} - "https://registry.npmjs.org/@openfn/" <> _adaptor -> - {:ok, json(%Tesla.Env{status: 200}, default_npm_response)} - end - end - ) + assert %{name: "@openfn/language-dhis2", latest: "3.0.5"} = + hd(results) + end - expected_adaptors = [ - "@openfn/language-asana", - "@openfn/language-common", - "@openfn/language-commcare", - "@openfn/language-dhis2", - "@openfn/language-http", - "@openfn/language-salesforce" - ] + test "returns empty list when DB has no data" do + Lightning.AdaptorData.Cache.invalidate("registry") start_supervised!( - {AdaptorRegistry, [name: :test_adaptor_registry, use_cache: false]} + {AdaptorRegistry, [name: :test_empty_registry, use_cache: false]} ) - results = AdaptorRegistry.all(:test_adaptor_registry) + results = AdaptorRegistry.all(:test_empty_registry) + assert results == [] + end - assert_received_tesla_call(env, []) + test "versions_for reads from cache" do + adaptors = [ + %{ + name: "@openfn/language-common", + repo: "git+https://github.com/OpenFn/language-common.git", + latest: "1.6.2", + versions: [%{version: "1.5.0"}, %{version: "1.6.2"}] + } + ] - assert_tesla_env(env, %Tesla.Env{ - method: :get, - url: "https://registry.npmjs.org/-/user/openfn/package" - }) + Lightning.AdaptorData.put("registry", "all", Jason.encode!(adaptors)) + Lightning.AdaptorData.Cache.invalidate("registry") - 1..length(expected_adaptors) - |> Enum.each(fn _ -> - assert_received_tesla_call(env, []) + start_supervised!( + {AdaptorRegistry, [name: :test_versions_registry, use_cache: false]} + ) - assert %Tesla.Env{ - method: :get, - url: "https://registry.npmjs.org/" <> adaptor - } = env + assert [%{version: "1.5.0"}, %{version: "1.6.2"}] = + AdaptorRegistry.versions_for( + :test_versions_registry, + "@openfn/language-common" + ) - assert adaptor in expected_adaptors - end) + assert AdaptorRegistry.versions_for( + :test_versions_registry, + "@openfn/language-foobar" + ) == nil + end - assert length(results) == 6 - - versions = [ - %{version: "1.1.0"}, - %{version: "1.1.1"}, - %{version: "1.2.0"}, - %{version: "1.2.1"}, - %{version: "1.2.2"}, - %{version: "1.2.4"}, - %{version: "1.2.5"}, - %{version: "1.2.6"}, - %{version: "1.2.7"}, - %{version: "1.2.8"}, - %{version: "1.4.0"}, - %{version: "1.4.1"}, - %{version: "1.4.2"}, - %{version: "1.5.0"}, - %{version: "1.6.0"}, - %{version: "1.6.1"}, - %{version: "1.6.2"} + test "latest_for reads from cache" do + adaptors = [ + %{ + name: "@openfn/language-common", + repo: "git+https://github.com/OpenFn/language-common.git", + latest: "1.6.2", + versions: [] + } ] - assert %{ - name: "@openfn/language-common", - repo: "git+https://github.com/OpenFn/language-common.git", - latest: "1.6.2", - versions: versions - } in results + Lightning.AdaptorData.put("registry", "all", Jason.encode!(adaptors)) + Lightning.AdaptorData.Cache.invalidate("registry") - assert AdaptorRegistry.versions_for( - :test_adaptor_registry, - "@openfn/language-common" - ) == - versions + start_supervised!( + {AdaptorRegistry, [name: :test_latest_registry, use_cache: false]} + ) - assert AdaptorRegistry.versions_for( - :test_adaptor_registry, + assert "1.6.2" = + AdaptorRegistry.latest_for( + :test_latest_registry, + "@openfn/language-common" + ) + + assert AdaptorRegistry.latest_for( + :test_latest_registry, "@openfn/language-foobar" - ) == - nil + ) == nil end + end + describe "start_link/1 in local mode" do @tag :tmp_dir - test "lists directory names of the when local_adaptors_repo is set", %{ + test "lists directory names when local_adaptors_repo is set", %{ tmp_dir: tmp_dir, test: test } do @@ -160,6 +136,71 @@ defmodule Lightning.AdaptorRegistryTest do end end + describe "refresh_sync/1" do + test "fetches from NPM and writes to DB cache" do + default_npm_response = + File.read!("test/fixtures/language-common-npm.json") |> Jason.decode!() + + expect_tesla_call( + times: 7, + returns: fn env, [] -> + case env.url do + "https://registry.npmjs.org/-/user/openfn/package" -> + {:ok, + json( + %Tesla.Env{status: 200}, + File.read!("test/fixtures/openfn-packages-npm.json") + |> Jason.decode!() + )} + + "https://registry.npmjs.org/@openfn/" <> _adaptor -> + {:ok, json(%Tesla.Env{status: 200}, default_npm_response)} + end + end + ) + + Lightning.AdaptorData.Cache.invalidate("registry") + + start_supervised!( + {AdaptorRegistry, [name: :test_refresh_registry, use_cache: false]} + ) + + assert {:ok, 6} = AdaptorRegistry.refresh_sync(:test_refresh_registry) + + # After refresh, data should be readable from cache + Lightning.AdaptorData.Cache.invalidate("registry") + assert length(AdaptorRegistry.all(:test_refresh_registry)) == 6 + end + + test "returns error when NPM returns empty results" do + # Mock npm to return empty package list (simulates offline) + expect_tesla_call( + times: 1, + returns: fn _env, [] -> + {:ok, json(%Tesla.Env{status: 200}, %{})} + end + ) + + start_supervised!( + {AdaptorRegistry, [name: :test_empty_refresh, use_cache: false]} + ) + + assert {:error, :empty_results} = + AdaptorRegistry.refresh_sync(:test_empty_refresh) + end + + @tag :tmp_dir + test "is a no-op in local mode", %{tmp_dir: tmp_dir, test: test} do + [tmp_dir, "packages", "foo"] |> Path.join() |> File.mkdir_p!() + + start_supervised!( + {AdaptorRegistry, [name: test, local_adaptors_repo: tmp_dir]} + ) + + assert {:ok, :local_mode} = AdaptorRegistry.refresh_sync(test) + end + end + describe "resolve_package_name/1" do test "it can split an NPM style package name" do assert AdaptorRegistry.resolve_package_name("@openfn/language-foo@1.2.3") == diff --git a/test/lightning/credential_schemas_test.exs b/test/lightning/credential_schemas_test.exs new file mode 100644 index 00000000000..fe2e41947ee --- /dev/null +++ b/test/lightning/credential_schemas_test.exs @@ -0,0 +1,153 @@ +defmodule Lightning.CredentialSchemasTest do + use Lightning.DataCase, async: false + + import Mock + + setup do + tmp_dir = + Path.join( + System.tmp_dir!(), + "credential_schemas_test_#{System.unique_integer([:positive])}" + ) + + File.mkdir_p!(tmp_dir) + + previous = Application.get_env(:lightning, :schemas_path) + Application.put_env(:lightning, :schemas_path, tmp_dir) + + on_exit(fn -> + if previous, + do: Application.put_env(:lightning, :schemas_path, previous), + else: Application.delete_env(:lightning, :schemas_path) + + File.rm_rf(tmp_dir) + end) + + %{schemas_path: tmp_dir} + end + + describe "refresh/0" do + test "returns error when npm fetch fails" do + with_mock HTTPoison, + get: fn _url, _headers, _opts -> + {:error, %HTTPoison.Error{reason: :econnrefused}} + end do + assert {:error, :econnrefused} = Lightning.CredentialSchemas.refresh() + end + end + + test "returns error on non-200 npm response" do + with_mock HTTPoison, + get: fn _url, _headers, _opts -> + {:ok, %HTTPoison.Response{status_code: 503, body: ""}} + end do + assert {:error, "NPM returned 503"} = + Lightning.CredentialSchemas.refresh() + end + end + + test "downloads schemas and returns count", %{schemas_path: schemas_path} do + packages = + Jason.encode!(%{ + "@openfn/language-http" => "read", + "@openfn/language-salesforce" => "read", + "@openfn/language-common" => "read", + "@openfn/language-devtools" => "read" + }) + + schema_body = Jason.encode!(%{"type" => "object", "properties" => %{}}) + + with_mock HTTPoison, + get: fn url, _headers, _opts -> + cond do + String.contains?(url, "registry.npmjs.org") -> + {:ok, %HTTPoison.Response{status_code: 200, body: packages}} + + String.contains?(url, "cdn.jsdelivr.net") -> + {:ok, %HTTPoison.Response{status_code: 200, body: schema_body}} + + true -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + end + end do + assert {:ok, count} = Lightning.CredentialSchemas.refresh() + + # language-common and language-devtools are excluded by default + # So only language-http and language-salesforce should be fetched + assert count == 2 + + assert File.exists?(Path.join(schemas_path, "http.json")) + assert File.exists?(Path.join(schemas_path, "salesforce.json")) + refute File.exists?(Path.join(schemas_path, "common.json")) + end + end + + test "wipes existing schemas before downloading new ones", %{ + schemas_path: schemas_path + } do + # Create a stale schema file that should be removed + stale_path = Path.join(schemas_path, "stale-adaptor.json") + File.write!(stale_path, ~s({"old": true})) + + packages = + Jason.encode!(%{ + "@openfn/language-http" => "read" + }) + + schema_body = Jason.encode!(%{"type" => "object"}) + + with_mock HTTPoison, + get: fn url, _headers, _opts -> + cond do + String.contains?(url, "registry.npmjs.org") -> + {:ok, %HTTPoison.Response{status_code: 200, body: packages}} + + String.contains?(url, "cdn.jsdelivr.net") -> + {:ok, %HTTPoison.Response{status_code: 200, body: schema_body}} + + true -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + end + end do + assert {:ok, 1} = Lightning.CredentialSchemas.refresh() + assert File.exists?(Path.join(schemas_path, "http.json")) + refute File.exists?(stale_path), "stale schema file should be removed" + end + end + + test "handles individual schema download failures gracefully", %{ + schemas_path: schemas_path + } do + packages = + Jason.encode!(%{ + "@openfn/language-http" => "read", + "@openfn/language-salesforce" => "read" + }) + + with_mock HTTPoison, + get: fn url, _headers, _opts -> + cond do + String.contains?(url, "registry.npmjs.org") -> + {:ok, %HTTPoison.Response{status_code: 200, body: packages}} + + String.contains?(url, "language-http") -> + {:ok, + %HTTPoison.Response{ + status_code: 200, + body: ~s({"type": "object"}) + }} + + String.contains?(url, "language-salesforce") -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + + true -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + end + end do + assert {:ok, 1} = Lightning.CredentialSchemas.refresh() + assert File.exists?(Path.join(schemas_path, "http.json")) + refute File.exists?(Path.join(schemas_path, "salesforce.json")) + end + end + end +end diff --git a/test/lightning/install_adaptor_icons_test.exs b/test/lightning/install_adaptor_icons_test.exs index fe6b9f673a0..11aa809afc7 100644 --- a/test/lightning/install_adaptor_icons_test.exs +++ b/test/lightning/install_adaptor_icons_test.exs @@ -1,123 +1,30 @@ defmodule Lightning.InstallAdaptorIconsTest do - use ExUnit.Case, async: false + use Lightning.DataCase, async: false - import Tesla.Mock - - alias LightningWeb.Router.Helpers, as: Routes alias Mix.Tasks.Lightning.InstallAdaptorIcons - @icons_path Application.compile_env(:lightning, :adaptor_icons_path) - |> Path.expand() - @adaptors_tar_url "https://github.com/OpenFn/adaptors/archive/refs/heads/main.tar.gz" - - @http_tar_path Path.expand("../fixtures/adaptors/http.tar.gz", __DIR__) - @dhis2_tar_path Path.expand("../fixtures/adaptors/dhis2.tar.gz", __DIR__) - @http_dhis2_tar_path Path.expand( - "../fixtures/adaptors/http_dhis2.tar.gz", - __DIR__ - ) setup do - File.mkdir_p(@icons_path) Mix.shell(Mix.Shell.Process) - on_exit(fn -> File.rm_rf!(@icons_path) end) - end - - @tag :capture_log - test "generates http adaptor icons correctly" do - mock(fn - %{method: :get, url: @adaptors_tar_url} -> - %Tesla.Env{status: 200, body: File.read!(@http_tar_path)} - end) - - assert File.ls!(@icons_path) == [] - InstallAdaptorIcons.run([]) - - assert_receive {:mix_shell, :info, [msg]} - assert msg =~ "Adaptor icons installed successfully. Manifest saved at: " - - icons = File.ls!(@icons_path) - assert length(icons) == 2 - assert "http-square.png" in icons - assert "adaptor_icons.json" in icons - assert File.read!(Path.join(@icons_path, "adaptor_icons.json")) == - Jason.encode!(%{ - http: %{ - square: - Routes.static_path( - LightningWeb.Endpoint, - "/images/adaptors/http-square.png" - ) - } - }) - end - - test "generates dhis2 adaptor icons correctly" do - mock(fn - %{method: :get, url: @adaptors_tar_url} -> - %Tesla.Env{status: 200, body: File.read!(@dhis2_tar_path)} - end) - - assert File.ls!(@icons_path) == [] - InstallAdaptorIcons.run([]) - - assert_receive {:mix_shell, :info, [msg]} - assert msg =~ "Adaptor icons installed successfully. Manifest saved at: " - - icons = File.ls!(@icons_path) - assert length(icons) == 2 - assert "dhis2-square.png" in icons - assert "adaptor_icons.json" in icons - - assert File.read!(Path.join(@icons_path, "adaptor_icons.json")) == - Jason.encode!(%{ - dhis2: %{ - square: - Routes.static_path( - LightningWeb.Endpoint, - "/images/adaptors/dhis2-square.png" - ) - } - }) + Lightning.AdaptorData.Cache.invalidate("icon_manifest") + Lightning.AdaptorData.Cache.invalidate("icon") + :ok end @tag :capture_log - test "generates both dhis2 and http adaptor icons correctly" do - mock(fn - %{method: :get, url: @adaptors_tar_url} -> - %Tesla.Env{status: 200, body: File.read!(@http_dhis2_tar_path)} - end) - - assert File.ls!(@icons_path) == [] + test "refreshes icon manifest and reports count" do InstallAdaptorIcons.run([]) assert_receive {:mix_shell, :info, [msg]} - assert msg =~ "Adaptor icons installed successfully. Manifest saved at: " - - icons = File.ls!(@icons_path) - assert length(icons) == 3 - assert "dhis2-square.png" in icons - assert "http-square.png" in icons - assert "adaptor_icons.json" in icons + assert msg =~ "Adaptor icons refreshed successfully." + assert msg =~ "adaptors in manifest." - expected_content = %{ - dhis2: %{ - square: - Routes.static_path( - LightningWeb.Endpoint, - "/images/adaptors/dhis2-square.png" - ) - }, - http: %{ - square: - Routes.static_path( - LightningWeb.Endpoint, - "/images/adaptors/http-square.png" - ) - } - } + # Verify manifest was stored in DB + assert {:ok, entry} = + Lightning.AdaptorData.get("icon_manifest", "all") - assert File.read!(Path.join(@icons_path, "adaptor_icons.json")) - |> Jason.decode!(keys: :atoms) == expected_content + assert entry.content_type == "application/json" + manifest = Jason.decode!(entry.data) + assert is_map(manifest) end end diff --git a/test/lightning/install_schemas_test.exs b/test/lightning/install_schemas_test.exs index 5257f672b25..fe44a8633ff 100644 --- a/test/lightning/install_schemas_test.exs +++ b/test/lightning/install_schemas_test.exs @@ -1,183 +1,116 @@ defmodule Lightning.InstallSchemasTest do use ExUnit.Case, async: false - use Mimic - import ExUnit.CaptureIO - import ExUnit.CaptureLog - require Logger + import Mock alias Mix.Tasks.Lightning.InstallSchemas - @request_options [recv_timeout: 15_000, pool: :default] - @ok_200 {:ok, 200, "headers", :client} - @ok_400 {:ok, 400, "headers", :client} + setup do + tmp_dir = + Path.join( + System.tmp_dir!(), + "install_schemas_test_#{System.unique_integer([:positive])}" + ) - @schemas_path Application.compile_env(:lightning, :schemas_path) + File.mkdir_p!(tmp_dir) - describe "install_schemas mix task" do - setup do - stub(:hackney) + previous = Application.get_env(:lightning, :schemas_path) + Application.put_env(:lightning, :schemas_path, tmp_dir) - :ok - end + on_exit(fn -> + if previous, + do: Application.put_env(:lightning, :schemas_path, previous), + else: Application.delete_env(:lightning, :schemas_path) - test "run success" do - expect(:hackney, :request, fn - :get, - "https://registry.npmjs.org/-/user/openfn/package", - [], - "", - @request_options -> - @ok_200 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:ok, - ~s({"@openfn/language-primero": "write","@openfn/language-asana": "write", "@openfn/language-common": "write"})} - end) - - expect(:hackney, :request, fn - :get, - "https://cdn.jsdelivr.net/npm/@openfn/language-asana/configuration-schema.json", - [], - "", - @request_options -> - @ok_200 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:ok, ~s({"name": "language-asana"})} - end) - - expect(:hackney, :request, fn - :get, - "https://cdn.jsdelivr.net/npm/@openfn/language-primero/configuration-schema.json", - [], - "", - @request_options -> - @ok_200 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:ok, ~s({"name": "language-primero"})} - end) - - File - |> expect(:rm_rf, fn _ -> nil end) - |> expect(:mkdir_p, fn _ -> nil end) - |> expect(:open!, fn - "test/fixtures/schemas/primero.json", [:write] -> nil - "test/fixtures/schemas/asana.json", [:write] -> nil - end) - |> expect(:close, 2, fn _ -> nil end) - - IO - |> expect(:binwrite, fn _, ~s({"name": "language-asana"}) -> nil end) - |> expect(:binwrite, fn _, ~s({"name": "language-primero"}) -> nil end) - - # |> expect(:binwrite, fn _, ~s({"name": "language-common"}) -> nil end) - - capture_io(fn -> - InstallSchemas.run([]) - end) - end + File.rm_rf(tmp_dir) + end) - test "run fail" do - expect(File, :rm_rf, fn _ -> {:error, "error occured"} end) - expect(File, :mkdir_p, fn _ -> {:error, "error occured"} end) + Mix.shell(Mix.Shell.Process) - assert_raise RuntimeError, - "Couldn't create the schemas directory: test/fixtures/schemas, got :error occured.", - fn -> - InstallSchemas.run([]) - end - end + %{schemas_path: tmp_dir} + end - test "persist_schema fail 1" do - expect(:hackney, :request, fn - :get, - "https://cdn.jsdelivr.net/npm/@openfn/language-asana/configuration-schema.json", - [], - "", - @request_options -> - @ok_200 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:error, %HTTPoison.Error{}} - end) - - assert_raise RuntimeError, "Unable to access @openfn/language-asana", fn -> - InstallSchemas.persist_schema(@schemas_path, "@openfn/language-asana") - end - end + describe "run/1" do + test "installs schemas and prints count", %{schemas_path: schemas_path} do + packages = + Jason.encode!(%{ + "@openfn/language-http" => "read", + "@openfn/language-salesforce" => "read", + "@openfn/language-common" => "read" + }) + + schema_body = Jason.encode!(%{"type" => "object"}) + + with_mock HTTPoison, + start: fn -> {:ok, self()} end, + get: fn url, _headers, _opts -> + cond do + String.contains?(url, "registry.npmjs.org") -> + {:ok, %HTTPoison.Response{status_code: 200, body: packages}} + + String.contains?(url, "cdn.jsdelivr.net") -> + {:ok, %HTTPoison.Response{status_code: 200, body: schema_body}} + + true -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + end + end do + InstallSchemas.run([]) + + assert_receive {:mix_shell, :info, [msg]} + assert msg =~ "Schemas installation has finished. 2 installed" - test "persist_schema fail 2" do - expect(:hackney, :request, fn - :get, - "https://cdn.jsdelivr.net/npm/@openfn/language-asana/configuration-schema.json", - [], - "", - @request_options -> - @ok_400 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:ok, %HTTPoison.Response{status_code: 400}} - end) - - {_result, log} = - with_log(fn -> - InstallSchemas.persist_schema(@schemas_path, "@openfn/language-asana") - end) - - assert log =~ - "Unable to fetch @openfn/language-asana configuration schema. status=400" + assert File.exists?(Path.join(schemas_path, "http.json")) + assert File.exists?(Path.join(schemas_path, "salesforce.json")) + refute File.exists?(Path.join(schemas_path, "common.json")) + end end - test "fetch_schemas fail 1" do - expect(:hackney, :request, fn - :get, - "https://registry.npmjs.org/-/user/openfn/package", - [], - "", - @request_options -> - @ok_200 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:error, %HTTPoison.Error{}} - end) - - assert_raise RuntimeError, - "Unable to connect to NPM; no adaptors fetched.", - fn -> - InstallSchemas.fetch_schemas([]) - end + test "raises on failure" do + with_mock HTTPoison, + start: fn -> {:ok, self()} end, + get: fn _url, _headers, _opts -> + {:error, %HTTPoison.Error{reason: :econnrefused}} + end do + assert_raise Mix.Error, ~r/Schema installation failed/, fn -> + InstallSchemas.run([]) + end + end end - test "fetch_schemas fail 2" do - expect(:hackney, :request, fn - :get, - "https://registry.npmjs.org/-/user/openfn/package", - [], - "", - @request_options -> - @ok_400 - end) - - expect(:hackney, :body, fn :client, _timeout -> - {:ok, %HTTPoison.Response{status_code: 400}} - end) - - assert_raise RuntimeError, - "Unable to access openfn user packages. status=400", - fn -> - InstallSchemas.fetch_schemas([]) - end + test "passes --exclude args through" do + packages = + Jason.encode!(%{ + "@openfn/language-http" => "read", + "@openfn/language-salesforce" => "read" + }) + + schema_body = Jason.encode!(%{"type" => "object"}) + + with_mock HTTPoison, + start: fn -> {:ok, self()} end, + get: fn url, _headers, _opts -> + cond do + String.contains?(url, "registry.npmjs.org") -> + {:ok, %HTTPoison.Response{status_code: 200, body: packages}} + + String.contains?(url, "cdn.jsdelivr.net") -> + {:ok, %HTTPoison.Response{status_code: 200, body: schema_body}} + + true -> + {:ok, %HTTPoison.Response{status_code: 404, body: ""}} + end + end do + InstallSchemas.run(["--exclude", "language-http"]) + + assert_receive {:mix_shell, :info, [msg]} + assert msg =~ "1 installed" + end end + end - test "parse_excluded" do + describe "parse_excluded/1 delegates to CredentialSchemas" do + test "with --exclude args" do assert [ "pack1", "pack2", @@ -185,10 +118,16 @@ defmodule Lightning.InstallSchemasTest do "language-devtools", "language-divoc" ] == - InstallSchemas.parse_excluded(["--exclude", "pack1", "pack2"]) + Lightning.CredentialSchemas.parse_excluded([ + "--exclude", + "pack1", + "pack2" + ]) + end + test "without args returns defaults" do assert ["language-common", "language-devtools", "language-divoc"] == - InstallSchemas.parse_excluded([]) + Lightning.CredentialSchemas.parse_excluded([]) end end end diff --git a/test/lightning/workflow_versions_test.exs b/test/lightning/workflow_versions_test.exs index 251ca02d12b..e0bb6e4f779 100644 --- a/test/lightning/workflow_versions_test.exs +++ b/test/lightning/workflow_versions_test.exs @@ -6,7 +6,7 @@ defmodule Lightning.WorkflowVersionsTest do alias Lightning.Repo alias Lightning.WorkflowVersions - alias Lightning.Workflows.{Workflow, WorkflowVersion} + alias Lightning.Workflows.WorkflowVersion @a "aaaaaaaaaaaa" @b "bbbbbbbbbbbb" diff --git a/test/lightning_web/controllers/adaptor_icon_controller_test.exs b/test/lightning_web/controllers/adaptor_icon_controller_test.exs new file mode 100644 index 00000000000..b3b553ca4dd --- /dev/null +++ b/test/lightning_web/controllers/adaptor_icon_controller_test.exs @@ -0,0 +1,152 @@ +defmodule LightningWeb.AdaptorIconControllerTest do + use LightningWeb.ConnCase, async: false + + import Mox + + setup :set_mox_from_context + setup :verify_on_exit! + + # Clean ETS cache entries that our tests rely on, so Plug.Static + # fallthrough and prior test runs don't interfere. + setup do + Lightning.AdaptorData.Cache.invalidate("icon") + Lightning.AdaptorData.Cache.invalidate("icon_manifest") + :ok + end + + describe "show/2" do + test "serves a cached icon from DB/ETS", %{conn: conn} do + # Use a fake adaptor name that has no file on disk so Plug.Static + # passes through to the router. + png_data = <<137, 80, 78, 71, 13, 10, 26, 10>> + + Lightning.AdaptorData.put( + "icon", + "fakexyz-square", + png_data, + "image/png" + ) + + Lightning.AdaptorData.Cache.invalidate("icon") + + conn = get(conn, "/images/adaptors/fakexyz-square.png") + + assert response(conn, 200) == png_data + + assert get_resp_header(conn, "content-type") == [ + "image/png; charset=utf-8" + ] + + assert ["public, max-age=604800"] = + get_resp_header(conn, "cache-control") + end + + test "fetches icon from GitHub on cache miss and caches it", + %{conn: conn} do + png_data = <<137, 80, 78, 71, 13, 10, 26, 10, 0, 0>> + + Mox.expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + assert env.url =~ + "raw.githubusercontent.com/OpenFn/adaptors/main/packages/fakexyz/assets/square.png" + + {:ok, %Tesla.Env{status: 200, body: png_data}} + end) + + conn = get(conn, "/images/adaptors/fakexyz-square.png") + + assert response(conn, 200) == png_data + + # Verify it was stored in DB + assert {:ok, entry} = + Lightning.AdaptorData.get("icon", "fakexyz-square") + + assert entry.data == png_data + assert entry.content_type == "image/png" + end + + test "handles adaptor names with hyphens (e.g. fake-multi-word)", + %{conn: conn} do + png_data = <<1, 2, 3, 4>> + + Mox.expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + assert env.url =~ + "/packages/fake-multi-word/assets/rectangle.png" + + {:ok, %Tesla.Env{status: 200, body: png_data}} + end) + + conn = get(conn, "/images/adaptors/fake-multi-word-rectangle.png") + + assert response(conn, 200) == png_data + end + + test "returns 404 when GitHub returns 404", %{conn: conn} do + Mox.expect(Lightning.Tesla.Mock, :call, fn _env, _opts -> + {:ok, %Tesla.Env{status: 404, body: ""}} + end) + + conn = get(conn, "/images/adaptors/nonexistent99-square.png") + assert response(conn, 404) + end + + test "returns 502 on GitHub error", %{conn: conn} do + Mox.expect(Lightning.Tesla.Mock, :call, fn _env, _opts -> + {:error, :timeout} + end) + + conn = get(conn, "/images/adaptors/fakexyz-square.png") + assert response(conn, 502) + end + + test "returns 400 for invalid filename format", %{conn: conn} do + conn = get(conn, "/images/adaptors/invalid.png") + assert response(conn, 400) + end + + test "returns 400 for filename with unknown shape", %{conn: conn} do + conn = get(conn, "/images/adaptors/fakexyz-circle.png") + assert response(conn, 400) + end + end + + describe "manifest/2" do + test "serves cached manifest from DB", %{conn: conn} do + manifest = + Jason.encode!(%{ + "fakexyz" => %{ + "square" => "/images/adaptors/fakexyz-square.png" + } + }) + + Lightning.AdaptorData.put( + "icon_manifest", + "all", + manifest, + "application/json" + ) + + Lightning.AdaptorData.Cache.invalidate("icon_manifest") + + conn = get(conn, "/images/adaptors/adaptor_icons.json") + + assert json_response(conn, 200) == %{ + "fakexyz" => %{ + "square" => "/images/adaptors/fakexyz-square.png" + } + } + + assert ["public, max-age=300"] = + get_resp_header(conn, "cache-control") + end + + test "returns empty JSON when no manifest is cached", %{conn: conn} do + # Delete any existing manifest from DB + Lightning.AdaptorData.delete("icon_manifest", "all") + Lightning.AdaptorData.Cache.invalidate("icon_manifest") + + conn = get(conn, "/images/adaptors/adaptor_icons.json") + + assert json_response(conn, 200) == %{} + end + end +end diff --git a/test/lightning_web/live/maintenance_live/index_test.exs b/test/lightning_web/live/maintenance_live/index_test.exs new file mode 100644 index 00000000000..0550c9b8853 --- /dev/null +++ b/test/lightning_web/live/maintenance_live/index_test.exs @@ -0,0 +1,62 @@ +defmodule LightningWeb.MaintenanceLive.IndexTest do + use LightningWeb.ConnCase, async: true + + import Phoenix.LiveViewTest + + describe "as a regular user" do + setup :register_and_log_in_user + + test "cannot access the maintenance page", %{conn: conn} do + {:ok, _live, html} = + live(conn, ~p"/settings/maintenance") + |> follow_redirect(conn, "/projects") + + assert html =~ "Sorry, you don't have access to that." + end + end + + describe "as a superuser" do + setup :register_and_log_in_superuser + + test "can access the maintenance page", %{conn: conn} do + {:ok, _live, html} = live(conn, ~p"/settings/maintenance") + + assert html =~ "Maintenance" + assert html =~ "Refresh Adaptor Registry" + assert html =~ "Install Adaptor Icons" + assert html =~ "Install Credential Schemas" + end + + test "clicking run button shows running state", %{conn: conn} do + {:ok, view, _html} = live(conn, ~p"/settings/maintenance") + + # We can't easily test the full async flow without mocking HTTP, + # but we can test the handle_info path by sending messages directly + send(view.pid, {:action_complete, "refresh_adaptor_registry", {:ok, 5}}) + + html = render(view) + assert html =~ "Done" + end + + test "shows error status on failure", %{conn: conn} do + {:ok, view, _html} = live(conn, ~p"/settings/maintenance") + + send( + view.pid, + {:action_complete, "install_adaptor_icons", {:error, "HTTP 500"}} + ) + + html = render(view) + assert html =~ "Failed" + end + + test "shows success status for schema install", %{conn: conn} do + {:ok, view, _html} = live(conn, ~p"/settings/maintenance") + + send(view.pid, {:action_complete, "install_schemas", {:ok, 42}}) + + html = render(view) + assert html =~ "Done" + end + end +end diff --git a/test/support/channel_case.ex b/test/support/channel_case.ex index 333231cb18a..2fdb1740eaf 100644 --- a/test/support/channel_case.ex +++ b/test/support/channel_case.ex @@ -55,6 +55,50 @@ defmodule LightningWeb.ChannelCase do shared: not tags[:async] ) + # Seed ETS cache with minimal adaptor registry so AdaptorRegistry.all() + # never falls through to DB from the GenServer (sandbox ownership issue). + registry_json = + Jason.encode!([ + %{ + name: "@openfn/language-common", + repo: "", + latest: "1.6.2", + versions: [ + %{version: "1.6.2"}, + %{version: "1.5.0"}, + %{version: "1.0.0"} + ] + }, + %{ + name: "@openfn/language-http", + repo: "", + latest: "7.2.0", + versions: [ + %{version: "7.2.0"}, + %{version: "2.0.0"}, + %{version: "1.0.0"} + ] + }, + %{ + name: "@openfn/language-dhis2", + repo: "", + latest: "3.0.4", + versions: [%{version: "3.0.4"}, %{version: "3.0.0"}] + }, + %{ + name: "@openfn/language-salesforce", + repo: "", + latest: "4.0.0", + versions: [%{version: "4.0.0"}] + } + ]) + + :ets.insert( + Lightning.AdaptorData.Cache, + {{"registry", "all"}, + %{data: registry_json, content_type: "application/json"}} + ) + on_exit(fn -> Ecto.Adapters.SQL.Sandbox.stop_owner(pid) end) :ok end diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index 88cc525887b..a92db881307 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -74,6 +74,51 @@ defmodule LightningWeb.ConnCase do shared: not tags[:async] ) + # Seed the ETS cache directly with minimal adaptor registry data so that + # AdaptorRegistry.all() never falls through to a DB query from the + # GenServer process (which would fail sandbox ownership checks in async tests). + registry_json = + Jason.encode!([ + %{ + name: "@openfn/language-common", + repo: "", + latest: "1.6.2", + versions: [ + %{version: "1.6.2"}, + %{version: "1.5.0"}, + %{version: "1.0.0"} + ] + }, + %{ + name: "@openfn/language-http", + repo: "", + latest: "7.2.0", + versions: [ + %{version: "7.2.0"}, + %{version: "2.0.0"}, + %{version: "1.0.0"} + ] + }, + %{ + name: "@openfn/language-dhis2", + repo: "", + latest: "3.0.4", + versions: [%{version: "3.0.4"}, %{version: "3.0.0"}] + }, + %{ + name: "@openfn/language-salesforce", + repo: "", + latest: "4.0.0", + versions: [%{version: "4.0.0"}] + } + ]) + + :ets.insert( + Lightning.AdaptorData.Cache, + {{"registry", "all"}, + %{data: registry_json, content_type: "application/json"}} + ) + on_exit(fn -> Ecto.Adapters.SQL.Sandbox.stop_owner(pid) end) Map.get(tags, :create_initial_user, true) diff --git a/test/support/data_case.ex b/test/support/data_case.ex index 6618fc69bf6..8d0ba5ade89 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -67,6 +67,51 @@ defmodule Lightning.DataCase do shared: not tags[:async] ) + # Seed the ETS cache directly with minimal adaptor registry data so that + # AdaptorRegistry.all() never falls through to a DB query from the + # GenServer process (which would fail sandbox ownership checks in async tests). + registry_json = + Jason.encode!([ + %{ + name: "@openfn/language-common", + repo: "", + latest: "1.6.2", + versions: [ + %{version: "1.6.2"}, + %{version: "1.5.0"}, + %{version: "1.0.0"} + ] + }, + %{ + name: "@openfn/language-http", + repo: "", + latest: "7.2.0", + versions: [ + %{version: "7.2.0"}, + %{version: "2.0.0"}, + %{version: "1.0.0"} + ] + }, + %{ + name: "@openfn/language-dhis2", + repo: "", + latest: "3.0.4", + versions: [%{version: "3.0.4"}, %{version: "3.0.0"}] + }, + %{ + name: "@openfn/language-salesforce", + repo: "", + latest: "4.0.0", + versions: [%{version: "4.0.0"}] + } + ]) + + :ets.insert( + Lightning.AdaptorData.Cache, + {{"registry", "all"}, + %{data: registry_json, content_type: "application/json"}} + ) + on_exit(fn -> Ecto.Adapters.SQL.Sandbox.stop_owner(pid) end) end end