Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions assets/js/source.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export function scrollBottom() {
}

async function logTemplate(e) {
const { via_rule, source_uuid, body } = e;
const { via_rule_id, source_uuid, body } = e;
const metadata = JSON.stringify(body, null, 2);
const formatter = await userSelectedFormatter();
const formattedDatetime = formatter(body.timestamp);
Expand Down Expand Up @@ -129,9 +129,9 @@ async function logTemplate(e) {
}">${formattedDatetime}</mark> ${logLevelTemplate}
${escape(body.event_message)}
${metadataElement}
${via_rule
${via_rule_id
? `<span
data-toggle="tooltip" data-placement="top" title="Matching ${via_rule.regex} routing from ${source_uuid}" style="color: ##5eeb8f;">
data-toggle="tooltip" data-placement="top" title="Routed from ${source_uuid}" style="color: #5eeb8f;">
<i class="fa fa-code-branch" style="font-size: 1em;"></i>
</span>`
: `<span></span>`
Expand Down
27 changes: 22 additions & 5 deletions lib/logflare/backends/source_sup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ defmodule Logflare.Backends.SourceSup do
Must be a backend rule.
"""
@spec rule_child_started?(Rule.t()) :: boolean()
def rule_child_started?(%Rule{backend_id: backend_id, source_id: source_id}) do
def rule_child_started?(%Rule{backend_id: backend_id, source_id: source_id})
when backend_id != nil,
do: backend_child_started?(backend_id, source_id)

@doc """
Checks if a given backend associated with source is started.
"""
@spec backend_child_started?(non_neg_integer(), non_neg_integer()) :: boolean()
def backend_child_started?(backend_id, source_id) do
via = Backends.via_source(source_id, AdaptorSupervisor, backend_id)

if GenServer.whereis(via) do
Expand All @@ -86,15 +94,24 @@ defmodule Logflare.Backends.SourceSup do
end

@doc """
Starts a given backend child spec for the backend associated with a rule.
Wrapper calling `start_backend_child_by_id/2` with backend and source ids
associated with given rule
"""
@spec start_rule_child(Rule.t()) :: Supervisor.on_start_child() | :noop
def start_rule_child(%Rule{} = rule),
do: start_backend_child_by_id(rule.backend_id, rule.source_id)

@doc """
Starts a backend child spec for the given backend id and source id.
This backend will not be registered for ingest dispatching.

This allows for zero-downtime ingestion, as we don't restart the SourceSup supervision tree.
"""
@spec start_rule_child(Rule.t()) :: Supervisor.on_start_child() | :noop
def start_rule_child(%Rule{backend_id: backend_id} = rule) do
@spec start_backend_child_by_id(non_neg_integer(), non_neg_integer()) ::
Supervisor.on_start_child() | :noop
def start_backend_child_by_id(backend_id, source_id) do
backend = Backends.Cache.get_backend(backend_id) |> Map.put(:register_for_ingest, false)
source = Sources.Cache.get_by_id(rule.source_id)
source = Sources.Cache.get_by_id(source_id)
start_backend_child(source, backend)
end

Expand Down
31 changes: 20 additions & 11 deletions lib/logflare/context_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,7 @@ defmodule Logflare.ContextCache do
cache = cache_name(context)
cache_key = {fun, args}

case Cachex.fetch(cache, cache_key, fn {fun, args} ->
# Use a `:cached` tuple here otherwise when an fn returns nil Cachex will miss
# the cache because it thinks ETS returned nil
{:commit, {:cached, apply(context, fun, args)}}
end) do
{:commit, {:cached, value}} ->
value

{:ok, {:cached, value}} ->
value
end
fetch(cache, cache_key, fn -> apply(context, fun, args) end)
end

@doc """
Expand Down Expand Up @@ -135,6 +125,25 @@ defmodule Logflare.ContextCache do
Module.concat(context, Cache)
end

@doc """
Low level API for fetching from cache. Allows wrapping calls with
`Cachex.execute/2` and accessing arbitrary key or calling any getter function.
"""
@spec fetch(Cachex.t(), {atom(), list()}, fun()) :: term()
def fetch(cache, cache_key, getter_fn) do
case Cachex.fetch(cache, cache_key, fn _cache_key ->
# Use a `:cached` tuple here otherwise when an fn returns nil Cachex will miss
# the cache because it thinks ETS returned nil
{:commit, {:cached, getter_fn.()}}
end) do
{:commit, {:cached, value}} ->
value

{:ok, {:cached, value}} ->
value
end
end

defp delete_matching_entries(entries, context_cache, pkey) do
to_delete =
entries
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/logs/log_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Logflare.LogEvent do
field :ingested_at, :utc_datetime_usec
field :source_uuid, Ecto.UUID.Atom
field :source_name, :string
field :via_rule, :map
field :via_rule_id, :id
field :retries, :integer, default: 0
field :event_type, Ecto.Enum, values: [:log, :metric, :trace], default: :log

Expand Down
19 changes: 3 additions & 16 deletions lib/logflare/rules/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,12 @@ defmodule Logflare.Rules.Cache do

def get_rules(ids) do
Cachex.execute!(__MODULE__, fn cache ->
for id <- ids, do: get_rule_via_cache(cache, id)
for id <- ids do
ContextCache.fetch(cache, {:get_rule, [id]}, fn -> Rules.get_rule(id) end)
end
end)
end

defp get_rule_via_cache(cache, id) do
Cachex.fetch(cache, {:get_rule, [id]}, fn _key ->
# Use a `:cached` tuple here otherwise when an fn returns nil Cachex will miss
# the cache because it thinks ETS returned nil
{:commit, {:cached, Rules.get_rule(id)}}
end)
|> case do
{:commit, {:cached, value}} ->
value

{:ok, {:cached, value}} ->
value
end
end

def list_by_source_id(id), do: apply_repo_fun(__ENV__.function, [id])
def list_by_backend_id(id), do: apply_repo_fun(__ENV__.function, [id])

Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/sources/source/channel_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Logflare.Sources.Source.ChannelTopics do

maybe_broadcast("source:#{source.token}", "source:#{source.token}:new", %{
body: body,
via_rule: le.via_rule && Map.take(le.via_rule, [:regex]),
via_rule_id: le.via_rule_id,
source_uuid: le.source_uuid
})
end
Expand Down
11 changes: 6 additions & 5 deletions lib/logflare/sources/source_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ defmodule Logflare.Sources.SourceRouter do
"""
@callback matching_rules(LE.t(), Source.t()) :: [Rule.t()]

@spec route_to_sinks_and_ingest(LE.t() | [LE.t()], Source.t(), module()) :: LE.t()
@spec route_to_sinks_and_ingest(LE.t() | [LE.t()], Source.t(), module()) :: LE.t() | [LE.t()]
def route_to_sinks_and_ingest(events, source, router \\ @default_router)

def route_to_sinks_and_ingest(events, source, router) when is_list(events),
do: Enum.map(events, &route_to_sinks_and_ingest(&1, source, router))

def route_to_sinks_and_ingest(%LE{via_rule: %Rule{}} = le, _source, _router), do: le
def route_to_sinks_and_ingest(%LE{via_rule_id: id} = le, _source, _router) when id != nil,
do: le

def route_to_sinks_and_ingest(%LE{via_rule: nil} = le, source, router) do
def route_to_sinks_and_ingest(%LE{via_rule_id: nil} = le, source, router) do
for rule <- router.matching_rules(le, source) do
do_routing(rule, le, source)
end
Expand All @@ -33,7 +34,7 @@ defmodule Logflare.Sources.SourceRouter do
when backend_id != nil do
# route to a backend
backend = Backends.Cache.get_backend(backend_id)
le = %{le | via_rule: rule}
le = %{le | via_rule_id: rule.id}
if SourceSup.rule_child_started?(rule) == false, do: SourceSup.start_rule_child(rule)

# ingest to a specific backend
Expand All @@ -44,7 +45,7 @@ defmodule Logflare.Sources.SourceRouter do
sink_source =
Sources.Cache.get_by(token: rule.sink) |> Sources.refresh_source_metrics_for_ingest()

le = %{le | source_id: sink_source.id, via_rule: rule}
le = %{le | source_id: sink_source.id, via_rule_id: rule.id}

Backends.ensure_source_sup_started(sink_source)
Backends.ingest_logs([le], sink_source)
Expand Down
16 changes: 2 additions & 14 deletions lib/logflare_web/controllers/source_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -462,20 +462,8 @@ defmodule LogflareWeb.SourceController do
end

defp get_and_encode_logs(%Source{} = source) do
log_events = Backends.list_recent_logs(source)

for le <- log_events, le do
le
|> Map.take([:body, :via_rule, :source_uuid])
|> case do
%{body: %{"metadata" => %{"level" => level}}}
when level in ~W(debug info warning error alert critical notice emergency) ->
body = Map.put(le.body, "level", level)
%{le | body: body}

le ->
le
end
for le <- Backends.list_recent_logs(source) do
Map.take(le, [:body, :via_rule_id, :source_uuid])
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/logflare_web/templates/source/show.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
<div class="collapse metadata" id={"metadata-#{inx}"}>
<pre class="pre-metadata"><code><%= JSON.encode!(log.body, pretty: true) %></code></pre>
</div>
<%= if log.via_rule do %>
<span data-toggle="tooltip" data-placement="top" title={"Matching #{ log.via_rule.lql_string } routing from #{log.source_uuid }"} style="color: ##5eeb8f;">
<%= if log.via_rule_id do %>
<span data-toggle="tooltip" data-placement="top" title={"Routed from #{ log.source_uuid }"} style="color: #5eeb8f;">
<i class="fa fa-code-branch" style="font-size: 1em;"></i>
</span>
<% end %>
Expand Down
4 changes: 2 additions & 2 deletions test/logflare/log_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Logflare.LogEventTest do
source_id: source_id,
valid: true,
pipeline_error: nil,
via_rule: nil
via_rule_id: nil
} = LogEvent.make(@vallog_event_ids, %{source: source})

assert id == body["id"]
Expand Down Expand Up @@ -280,7 +280,7 @@ defmodule Logflare.LogEventTest do
is_from_stale_query: nil,
valid: true,
pipeline_error: nil,
via_rule: nil
via_rule_id: nil
} = LogEvent.make(%{"metadata" => "some string"}, %{source: source})

assert id == body["id"]
Expand Down
30 changes: 28 additions & 2 deletions test/logflare/sources/source_router_test.exs
Original file line number Diff line number Diff line change
@@ -1,20 +1,46 @@
defmodule Logflare.Sources.SourceRouterTest do
use Logflare.DataCase

alias Logflare.Sources.SourceRouter
alias Logflare.Backends.SourceSup
alias Logflare.LogEvent
alias Logflare.Lql.Parser
alias Logflare.Lql.Rules.FilterRule
alias Logflare.Rules.Rule
alias Logflare.Sources.SourceRouter
alias Logflare.SystemMetrics.AllLogsLogged

@routers [SourceRouter.Sequential, SourceRouter.RulesTree]

setup do
[user: insert(:user)]
start_supervised!(AllLogsLogged)
insert(:plan)
user = insert(:user)
[user: user, backend: insert(:backend, user: user)]
end

for router <- @routers do
describe "#{inspect(router)} handles" do
test "SourceRouter LE handling", %{user: user, backend: backend} do
rule = build(:rule, backend: backend, lql_string: "testing")
source = insert(:source, user: user, rules: [rule])

start_supervised!({SourceSup, source})

%LogEvent{} = le = build(:log_event, source: source, message: "testing123")

assert SourceRouter.route_to_sinks_and_ingest(le, source, unquote(router)) == %LogEvent{
le
| via_rule_id: rule.id
}

assert SourceRouter.route_to_sinks_and_ingest([le], source, unquote(router)) == [
%LogEvent{
le
| via_rule_id: rule.id
}
]
end

test "list_includes operator", %{user: user} do
build_data = fn metadata_val, filter_val ->
rule = %Rule{
Expand Down
3 changes: 2 additions & 1 deletion test/logflare_web/controllers/source_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule LogflareWeb.SourceControllerTest do
alias Logflare.Repo
alias Logflare.Backends.SourceSup
alias Logflare.Backends
alias Logflare.LogEvent
alias Logflare.SystemMetrics.AllLogsLogged

setup do
Expand Down Expand Up @@ -141,7 +142,7 @@ defmodule LogflareWeb.SourceControllerTest do

test "show source's recent logs", %{conn: conn, source: source} do
start_supervised!({SourceSup, source})
le = build(:log_event, source: source, metadata: %{"level" => "debug"})
le = build(:log_event, level: "debug", source: source)
Backends.ingest_logs([le], source)

conn
Expand Down
52 changes: 16 additions & 36 deletions test/profiling/source_routing_bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ alias Logflare.Rules
alias Logflare.Rules.Rule
alias Logflare.Users
alias Logflare.Lql.Rules.FilterRule
alias Logflare.Lql.Parser

import Logflare.Factory

Expand Down Expand Up @@ -58,21 +59,12 @@ one_matching = fn rules_num ->
for i <- 1..rules_num do
backend = insert(:backend, user: user)

lql_string = "metadata.rule_id:rule-#{i} severity_number:>8"
{:ok, filters} = Parser.parse(lql_string)

%Rule{
lql_filters: [
%FilterRule{
path: "metadata.rule_id",
operator: :=,
value: "rule-#{i}",
modifiers: %{}
},
%FilterRule{
path: "severity_number",
operator: :>,
value: 8,
modifiers: %{}
}
],
lql_string: lql_string,
lql_filters: filters,
backend_id: backend.id
}
end
Expand All @@ -87,15 +79,12 @@ few_matching = fn rules_num ->
for i <- 1..rules_num do
backend = insert(:backend, user: user)

lql_string = "severity_number:>#{i}"
{:ok, filters} = Parser.parse(lql_string)

%Rule{
lql_filters: [
%FilterRule{
path: "severity_number",
operator: :>,
value: i,
modifiers: %{}
}
],
lql_string: lql_string,
lql_filters: filters,
backend_id: backend.id
}
end
Expand All @@ -110,21 +99,12 @@ all_matching = fn rules_num ->
for i <- 1..rules_num do
backend = insert(:backend, user: user)

lql_string = "m.type:otel_log severity_number:>8"
{:ok, filters} = Parser.parse(lql_string)

%Rule{
lql_filters: [
%FilterRule{
path: "metadata.type",
operator: :=,
value: "otel_log",
modifiers: %{}
},
%FilterRule{
path: "severity_number",
operator: :>,
value: 8,
modifiers: %{}
}
],
lql_string: lql_string,
lql_filters: filters,
backend_id: backend.id
}
end
Expand Down
Loading