Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,17 @@ config :mime, :types, %{
# use legacy artifacts for users on older CPUs or virtualized environments without advanced CPU features
config :explorer, use_legacy_artifacts: true

# config/config.exs
config :logflare, Oban,
engine: Oban.Engines.Basic,
queues: [default: 10, fetch: 5],
repo: Logflare.Repo,
plugins: [
{Oban.Plugins.Pruner, max_age: 86_400},
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", Logflare.FetchQueries.FetchQuerySchedulerWorker}
]}
]

import_config "#{Mix.env()}.exs"
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ config :logger,
config :tesla, Logflare.Backends.Adaptor.WebhookAdaptor.Client, adapter: Tesla.Mock

config :phoenix_test, otp_app: :logflare, endpoint: LogflareWeb.Endpoint

config :logflare, Oban, testing: :manual
9 changes: 8 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ defmodule Logflare.Application do
child_spec: {Logflare.Scheduler, name: Logflare.Scheduler.scheduler_name()}},
# active users tracking for UserMetricsPoller
{Logflare.ActiveUserTracker,
[name: Logflare.ActiveUserTracker, pubsub_server: Logflare.PubSub]}
[name: Logflare.ActiveUserTracker, pubsub_server: Logflare.PubSub]},
{Oban, Application.fetch_env!(:logflare, Oban)}
]
end

Expand Down Expand Up @@ -187,5 +188,11 @@ defmodule Logflare.Application do
SingleTenant.update_supabase_source_schemas()
end
end

Logger.info("Running fetch query scheduler on startup")

%{}
|> Logflare.FetchQueries.FetchQuerySchedulerWorker.new()
|> Oban.insert()
end
end
9 changes: 8 additions & 1 deletion lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,14 @@ defmodule Logflare.Backends do
defp dispatch_to_backends(source, nil, log_events) do
backends = __MODULE__.Cache.list_backends(source_id: source.id)

for backend <- [nil | backends] do
backends_with_default =
if source.disable_system_default_backend? do
backends
else
[nil | backends]
end

for backend <- backends_with_default do
{queue_key, backend_type} =
case backend do
nil ->
Expand Down
10 changes: 9 additions & 1 deletion lib/logflare/backends/source_sup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ defmodule Logflare.Backends.SourceSup do

default_backend = Backends.get_default_backend(user)

default_backends =
if source.disable_system_default_backend? do
[]
else
[default_backend]
end

specs =
[default_backend | ingest_backends]
default_backends
|> Enum.concat(ingest_backends)
|> Enum.concat(rules_backends)
|> Enum.map(&Backend.child_spec(source, &1))
|> Enum.uniq()
Expand Down
219 changes: 219 additions & 0 deletions lib/logflare/fetch_queries.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
defmodule Logflare.FetchQueries do
@moduledoc """
The FetchQueries context for pull-based data ingestion.
"""

import Ecto.Query, warn: false

alias Logflare.FetchQueries.FetchQuery
alias Logflare.Backends
alias Logflare.Repo
alias Logflare.Teams
alias Logflare.User

require Logger

@doc """
Returns the list of fetch queries a user has access to, including where the user is a team member.
"""
@spec list_fetch_queries_by_user_access(User.t()) :: [FetchQuery.t()]
def list_fetch_queries_by_user_access(%User{} = user) do
FetchQuery
|> Teams.filter_by_user_access(user)
|> Repo.all()
|> Enum.map(&preload_fetch_query/1)
end

@doc """
Gets a single fetch query.
"""
def get_fetch_query(id) when is_integer(id) do
Repo.get(FetchQuery, id)
end

@doc """
Gets a fetch query by external_id.
"""
def get_fetch_query_by_external_id(external_id) do
Repo.get_by(FetchQuery, external_id: external_id)
end

@doc """
Gets a fetch query by id that the user has access to.
Returns the fetch query if the user owns it or is a team member, otherwise returns nil.
"""
@spec get_fetch_query_by_user_access(User.t(), integer() | String.t()) :: FetchQuery.t() | nil
def get_fetch_query_by_user_access(user_or_team_user, id)
when is_integer(id) or is_binary(id) do
FetchQuery
|> Teams.filter_by_user_access(user_or_team_user)
|> where([fq], fq.id == ^id)
|> Repo.one()
end

@doc """
Preload fetch query with backend and source.
"""
def preload_fetch_query(fetch_query) do
fetch_query
|> Repo.preload([:user, :backend, :source])
|> then(fn %FetchQuery{backend: backend} = fq ->
case backend do
nil ->
fq

%{} ->
%{fq | backend: Backends.typecast_config_string_map_to_atom_map(backend)}
end
end)
end

@doc """
Gets enabled fetch queries (for scheduler).
"""
def list_enabled_fetch_queries do
FetchQuery
|> where([fq], fq.enabled == true)
|> Repo.all()
end

@doc """
Creates a fetch query.
"""
def create_fetch_query(attrs \\ %{}) do
attrs = resolve_backend_id(attrs)

%FetchQuery{}
|> FetchQuery.changeset(attrs)
|> Repo.insert()
end

@doc """
Updates a fetch query.
"""
def update_fetch_query(%FetchQuery{} = fetch_query, attrs) do
attrs = resolve_backend_id(attrs)

fetch_query
|> FetchQuery.changeset(attrs)
|> Repo.update()
end

@doc """
Deletes a fetch query.
"""
def delete_fetch_query(%FetchQuery{} = fetch_query) do
Repo.delete(fetch_query)
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking fetch_query changes.
"""
def change_fetch_query(%FetchQuery{} = fetch_query, attrs \\ %{}) do
FetchQuery.changeset(fetch_query, attrs)
end

@doc """
Lists execution history from Oban jobs for a fetch query.
Returns up to 50 most recent jobs.
"""
def list_execution_history(fetch_query_id) when is_integer(fetch_query_id) do
from(job in Oban.Job,
where:
fragment(
"?->>'fetch_query_id' = ?",
job.args,
^to_string(fetch_query_id)
),
order_by: [desc: job.scheduled_at],
limit: 50
)
|> Repo.all()
end

@doc """
Syncs fetch query schedule by canceling old jobs and creating new ones.
"""
def sync_fetch_query_schedule(%FetchQuery{id: id}) do
# Cancel existing jobs for this fetch query
from(job in Oban.Job,
where:
fragment(
"?->>'fetch_query_id' = ?",
job.args,
^to_string(id)
) and job.state not in ["completed", "discarded", "cancelled"]
)
|> Repo.all()
|> Enum.each(&Oban.cancel_job/1)

:ok
end

@doc """
Triggers a fetch query to run immediately by creating an Oban job with schedule_in: 0.
"""
@spec trigger_fetch_query_now(integer()) :: {:ok, Oban.Job.t()} | {:error, term()}
def trigger_fetch_query_now(fetch_query_id) when is_integer(fetch_query_id) do
%{
fetch_query_id: fetch_query_id,
scheduled_at: DateTime.utc_now() |> DateTime.to_iso8601()
}
|> Logflare.FetchQueries.FetchQueryWorker.new(schedule_in: 0)
|> Oban.insert()
end

@doc """
Partitions jobs into future (scheduled/available/executing) and past (completed/discarded/cancelled).
"""
@spec partition_jobs_by_time([Oban.Job.t()]) :: {[Oban.Job.t()], [Oban.Job.t()]}
def partition_jobs_by_time(jobs) do
Enum.split_with(jobs, fn job ->
job.state in ["available", "scheduled", "executing"]
end)
end

defp resolve_backend_id(attrs) do
# Check if backend_id is nil or empty
backend_id = Map.get(attrs, "backend_id") || Map.get(attrs, :backend_id)

case backend_id do
nil ->
maybe_set_default_backend(attrs)

"" ->
maybe_set_default_backend(attrs)

_ ->
attrs
end
end

defp maybe_set_default_backend(attrs) do
user_id = Map.get(attrs, "user_id") || Map.get(attrs, :user_id)

case user_id do
nil ->
attrs

user_id ->
case Repo.get(User, user_id) do
nil ->
attrs

user ->
default_backend = Backends.get_default_backend(user)

# Determine which key type to use based on attrs structure
key =
if Map.has_key?(attrs, "backend_id") or Enum.all?(Map.keys(attrs), &is_binary/1) do

Check warning on line 209 in lib/logflare/fetch_queries.ex

View workflow job for this annotation

GitHub Actions / Checks (Code Quality - Linting, mix lint)

Function body is nested too deep (max depth is 2, was 3).
"backend_id"
else
:backend_id
end

Map.put(attrs, key, default_backend.id)
end
end
end
end
Loading
Loading