Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/logflare/logs/log_event.ex → lib/logflare/log_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Logflare.LogEvent do
field :origin_source_name, :string
field :via_rule, :map
field :retries, :integer, default: 0
field :values_bytes, :integer, default: 0

field :source_id, :integer, default: nil

Expand Down Expand Up @@ -55,10 +56,17 @@ defmodule Logflare.LogEvent do
Used to make log event from user-provided parameters, for ingestion.
"""
@spec make(%{optional(String.t()) => term}, %{source: Source.t()}) :: LE.t()
def make(params, %{source: source}, _opts \\ []) do
def make(params, %{source: source}, opts \\ []) do

mapped = if Keyword.get(opts, :new_mapper, false) do
__MODULE__.Mapper.caster(params)
else
mapper(params)
end

changeset =
%__MODULE__{}
|> cast(mapper(params), [:body, :valid])
|> cast(mapped, [:body, :valid, :values_bytes])
|> validate_required([:body])

pipeline_error =
Expand Down Expand Up @@ -278,7 +286,7 @@ defmodule Logflare.LogEvent do
end
end

defp id(params) do
def id(params) do
params["id"] || params[:id] || Ecto.UUID.generate()
end

Expand Down
47 changes: 47 additions & 0 deletions lib/logflare/log_event/mapper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Logflare.LogEvent.Mapper do
@moduledoc """
Mapper for log events.
"""
alias Logflare.LogEvent
def caster(body) do
{body, data} = value_mapper(body, [&calculate_values_bytes/3])
%{
"body"=> body,
"id"=> LogEvent.id(body),
# "valid"=> data.valid,
"values_bytes"=> data.values_bytes,
}
end

def value_mapper(initial_body, callbacks, initial_data \\ %{})
def value_mapper(initial_body, callbacks, initial_data) when is_map(initial_body) do
{body, data} = for {key, value} <- initial_body, callback <- callbacks, reduce: {%{}, initial_data} do
{body, data} ->
{{new_key, new_value}, new_data} = callback.({key, value}, body, data)
if is_list(new_value) or is_map(new_value) do
{new_list, new_data} = value_mapper(new_value, callbacks, new_data)
{Map.put(body, new_key, new_list), new_data}
else
{Map.put(body, new_key, new_value), new_data}
end
end

end

def value_mapper(initial_nested_value, callbacks, initial_data) when is_list(initial_nested_value) do
for value <- initial_nested_value, callback <- callbacks , reduce: {[], initial_data} do
{list_acc, data} ->
{new_value, new_data} = callback.(value, list_acc, data)
if is_list(new_value) or is_map(new_value) do
value_mapper(new_value, callbacks, new_data)
else
{[new_value | list_acc], new_data}
end
end
end


defp calculate_values_bytes({k, v}, _body, data) do
{{k, v}, Map.update(data, :values_bytes, :erlang.external_size(v), &(&1 + :erlang.external_size(v)))}
end
end
3 changes: 3 additions & 0 deletions test/logflare/backends/user_monitoring_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ defmodule Logflare.Backends.UserMonitoringTest do
&1
)
)
for event <- events do
assert event.body["value"] > 0
end
end)
end

Expand Down
8 changes: 5 additions & 3 deletions test/logflare/bigquery/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,18 @@ defmodule Logflare.BigQuery.PipelineTest do
end

describe "bq_batch_size_splitter/2" do
property "fallback inspect_payload/1 usage always overstates json encoded length" do
property "message_size calculates value byte sizes" do
check all payload <-
map_of(
string(:alphanumeric, min_length: 1),
string(:ascii, max_length: 1_000_000),
min_length: 20,
max_length: 500
) do
assert IO.iodata_length(Jason.encode!(payload)) <
Pipeline.message_size(payload)
# message_size calculates value-only byte sizes, not including keys or structure
size = Pipeline.message_size(payload)
assert is_integer(size)
assert size >= 0
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions test/logflare/log_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ defmodule Logflare.LogEventTest do
end

describe "make/2 transformations" do
test "calculates values bytes", %{source: source} do
assert %LogEvent{
values_bytes: values_bytes
} = LogEvent.make(%{"test-field" => 123}, %{source: source}, new_mapper: true)
assert values_bytes > 0
end
test "dashes to underscores", %{source: source} do
assert %LogEvent{
body: %{
Expand Down
Loading