Skip to content
Open
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
# notification. To prevent flooding the recipients, it will wait for a period
# before it sends the next email (assuming the failure condition persists).
# Changing this setting will affect the frequency of sending.
# KAFKA_NOTIFICATTION_EMBARGO_SECONDS=3600
# KAFKA_NOTIFICATION_EMBARGO_SECONDS=3600
#
# If the Kafka pipelines failed to persist a message, the message can be
# persisted as JSON to the local file system. To enable this, set
Expand All @@ -282,7 +282,7 @@
# Lightning starts and it must be writable by the user that Lightning runs as.
# KAFKA_ALTERNATE_STORAGE_FILE_PATH=/path/to/alternate/storage
#
# This file to which the registry should be read from. In case the file doesnt
# This file to which the registry should be read from. In case the file doesn't
# exist, Lightning will attempt to fetch the file and write it to the same location.
# For this reason, you have to make sure that the directory exists and it is writable
# ADAPTORS_REGISTRY_JSON_PATH=/path/to/adaptor_registry_cache.json
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ and this project adheres to

### Changed

- Allow instance admins to install credential schemas and update the adaptor
registry on the fly [#3114](https://github.com/OpenFn/lightning/issues/3114),
[#2209](https://github.com/OpenFn/lightning/issues/2209),
[#325](https://github.com/OpenFn/lightning/issues/325),
[#1996](https://github.com/OpenFn/lightning/issues/1996)

### Fixed

- Auto-increment job name when adaptor display name is already used in workflow
Expand Down
3 changes: 3 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ config :lightning, LightningWeb.Endpoint,
storybook_tailwind: {Tailwind, :install_and_run, [:storybook, ~w(--watch)]}
]

# schemas_path and adaptor_icons_path are only used by build-time mix tasks
# (install_schemas, install_adaptor_icons) for baking data into Docker images.
# At runtime, the DB is the primary source via AdaptorData/ETS cache.
config :lightning,
schemas_path: "priv/schemas",
adaptor_icons_path: "priv/static/images/adaptors",
Expand Down
3 changes: 3 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ config :phoenix, :filter_parameters, [
"token"
]

# schemas_path and adaptor_icons_path are only used by build-time mix tasks
# (install_schemas, install_adaptor_icons) for baking data into Docker images.
# At runtime, the DB is the primary source via AdaptorData/ETS cache.
config :lightning,
schemas_path: "priv/schemas",
adaptor_icons_path: "priv/static/images/adaptors"
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ config :lightning, Lightning.FailureAlerter,
time_scale: 60_000,
rate_limit: 3

# schemas_path and adaptor_icons_path point to test fixtures for build-time
# mix task tests. At runtime, tests use the DB via AdaptorData/ETS cache.
config :lightning,
schemas_path: "test/fixtures/schemas",
adaptor_icons_path: "test/fixtures/adaptors/icons",
Expand Down
123 changes: 123 additions & 0 deletions lib/lightning/adaptor_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
defmodule Lightning.AdaptorData do
@moduledoc """
Context for managing adaptor cache entries in the database.

Provides CRUD operations for storing adaptor registry data, credential
schemas, adaptor icons, and other cacheable adaptor metadata. Each entry
is keyed by a `kind` (category) and a unique `key` within that kind.
"""
import Ecto.Query

alias Lightning.AdaptorData.CacheEntry
alias Lightning.Repo

@doc """
Upserts a single cache entry.

If an entry with the same `kind` and `key` already exists, its `data`,
`content_type`, and `updated_at` fields are replaced.

Returns `{:ok, %CacheEntry{}}` or `{:error, %Ecto.Changeset{}}`.
"""
@spec put(String.t(), String.t(), binary(), String.t()) ::
{:ok, CacheEntry.t()} | {:error, Ecto.Changeset.t()}
def put(kind, key, data, content_type \\ "application/json") do
%CacheEntry{}
|> CacheEntry.changeset(%{
kind: kind,
key: key,
data: data,
content_type: content_type
})
|> Repo.insert(
conflict_target: [:kind, :key],
on_conflict: {:replace, [:data, :content_type, :updated_at]},
returning: true
)
end

@doc """
Bulk upserts a list of entries for the given `kind`.

Each entry in `entries` must be a map with `:key`, `:data`, and optionally
`:content_type` keys.

Returns `{count, nil | [%CacheEntry{}]}` where `count` is the number of
rows affected.
"""
@spec put_many(String.t(), [map()]) :: {non_neg_integer(), nil}
def put_many(kind, entries) when is_list(entries) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)

rows =
Enum.map(entries, fn entry ->
%{
id: Ecto.UUID.generate(),
kind: kind,
key: Map.fetch!(entry, :key),
data: Map.fetch!(entry, :data),
content_type: Map.get(entry, :content_type, "application/json"),
inserted_at: now,
updated_at: now
}
end)

Repo.insert_all(CacheEntry, rows,
conflict_target: [:kind, :key],
on_conflict: {:replace, [:data, :content_type, :updated_at]}
)
end

@doc """
Gets a single cache entry by `kind` and `key`.

Returns `{:ok, %CacheEntry{}}` or `{:error, :not_found}`.
"""
@spec get(String.t(), String.t()) ::
{:ok, CacheEntry.t()} | {:error, :not_found}
def get(kind, key) do
case Repo.get_by(CacheEntry, kind: kind, key: key) do
nil -> {:error, :not_found}
entry -> {:ok, entry}
end
end

@doc """
Gets all cache entries for the given `kind`.

Returns a list of `%CacheEntry{}` structs ordered by key.
"""
@spec get_all(String.t()) :: [CacheEntry.t()]
def get_all(kind) do
CacheEntry
|> where([e], e.kind == ^kind)
|> order_by([e], asc: e.key)
|> Repo.all()
end

@doc """
Deletes all cache entries for the given `kind`.

Returns `{count, nil}` where `count` is the number of deleted rows.
"""
@spec delete_kind(String.t()) :: {non_neg_integer(), nil}
def delete_kind(kind) do
CacheEntry
|> where([e], e.kind == ^kind)
|> Repo.delete_all()
end

@doc """
Deletes a specific cache entry by `kind` and `key`.

Returns `{:ok, %CacheEntry{}}` or `{:error, :not_found}`.
"""
@spec delete(String.t(), String.t()) ::
{:ok, CacheEntry.t()} | {:error, :not_found}
def delete(kind, key) do
case Repo.get_by(CacheEntry, kind: kind, key: key) do
nil -> {:error, :not_found}
entry -> {:ok, Repo.delete!(entry)}
end
end
end
98 changes: 98 additions & 0 deletions lib/lightning/adaptor_data/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
defmodule Lightning.AdaptorData.Cache do
@moduledoc """
ETS-backed read-through cache for adaptor data.

Read path: ETS -> DB -> nil
Write path: DB -> broadcast invalidate -> all nodes clear ETS
Next read on any node: ETS miss -> DB hit -> ETS populated
"""

@table __MODULE__

@doc "Create the ETS table. Called from Application.start/2."
def init do
if :ets.whereis(@table) == :undefined do
:ets.new(@table, [
:set,
:public,
:named_table,
read_concurrency: true
])
end

:ok
end

@doc "Get a cached value. Falls back to DB on miss, populates ETS."
def get(kind, key) do
cache_key = {kind, key}

case :ets.lookup(@table, cache_key) do
[{^cache_key, value}] ->
value

[] ->
case Lightning.AdaptorData.get(kind, key) do
{:error, :not_found} ->
nil

{:ok, entry} ->
value = %{data: entry.data, content_type: entry.content_type}
:ets.insert(@table, {cache_key, value})
value
end
end
end

@doc "Get all entries of a kind. Falls back to DB on miss."
def get_all(kind) do
cache_key = {kind, :__all__}

case :ets.lookup(@table, cache_key) do
[{^cache_key, entries}] ->
entries

[] ->
entries = Lightning.AdaptorData.get_all(kind)

if entries != [] do
values =
Enum.map(entries, fn e ->
%{key: e.key, data: e.data, content_type: e.content_type}
end)

:ets.insert(@table, {cache_key, values})
values
else
[]
end
end
end

@doc "Invalidate all cached entries for a kind."
def invalidate(kind) do
# Delete the :__all__ key
:ets.delete(@table, {kind, :__all__})

# Delete individual keys for this kind
:ets.select_delete(@table, [
{{{kind, :_}, :_}, [], [true]}
])

:ok
end

@doc "Invalidate all cached entries."
def invalidate_all do
:ets.delete_all_objects(@table)
:ok
end

@doc "Broadcast cache invalidation to all nodes."
def broadcast_invalidation(kinds) when is_list(kinds) do
Lightning.API.broadcast(
"adaptor:data",
{:invalidate_cache, kinds, node()}
)
end
end
37 changes: 37 additions & 0 deletions lib/lightning/adaptor_data/cache_entry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Lightning.AdaptorData.CacheEntry do
@moduledoc """
Schema for adaptor cache entries stored in the database.

Each entry is identified by a `kind` (e.g., "registry", "schema", "icon")
and a `key` (e.g., adaptor name or path). The `data` field holds the raw
binary content and `content_type` describes its format.
"""
use Lightning.Schema

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
kind: String.t(),
key: String.t(),
data: binary(),
content_type: String.t(),
inserted_at: DateTime.t(),
updated_at: DateTime.t()
}

schema "adaptor_cache_entries" do
field :kind, :string
field :key, :string
field :data, :binary
field :content_type, :string, default: "application/json"

timestamps(type: :utc_datetime_usec)
end

@doc false
def changeset(entry, attrs) do
entry
|> cast(attrs, [:kind, :key, :data, :content_type])
|> validate_required([:kind, :key, :data])
|> unique_constraint([:kind, :key])
end
end
34 changes: 34 additions & 0 deletions lib/lightning/adaptor_data/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Lightning.AdaptorData.Listener do
@moduledoc """
GenServer that subscribes to PubSub for cache invalidation messages.

When a node writes new data to the DB, it broadcasts
`{:invalidate_cache, kinds, node()}`. All nodes (including the sender)
clear those kinds from their ETS cache. The next read on any node will
go to DB and repopulate ETS.
"""

use GenServer
require Logger

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl GenServer
def init(_opts) do
Lightning.API.subscribe("adaptor:data")
{:ok, %{}}
end

@impl GenServer
def handle_info({:invalidate_cache, kinds, _origin_node}, state) do
Logger.info("Invalidating adaptor cache for: #{inspect(kinds)}")

Enum.each(kinds, &Lightning.AdaptorData.Cache.invalidate/1)

{:noreply, state}
end

def handle_info(_msg, state), do: {:noreply, state}
end
Loading