Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
465361c
adds native protocol module for clickhouse
amokan Feb 19, 2026
e33bafd
native clickhouse connection + handshake working
amokan Feb 20, 2026
c95f6c6
query packet / server response parsing implemented
amokan Feb 20, 2026
6674250
adds data block encoding logic and simple benchmark script
amokan Feb 20, 2026
ca27701
basic end-to-end insert over native ch protocol
amokan Feb 20, 2026
7a811ec
add `enum8` encoding
amokan Feb 20, 2026
02678f8
add test for enum8 encoding
amokan Feb 20, 2026
7bfd91b
adds additional block encoder logic
amokan Feb 20, 2026
e808315
native inserts into full DDL - with some compromises
amokan Feb 20, 2026
ed0bd2e
move json sanitizing functions to encoding utils module
amokan Feb 20, 2026
e9731b8
improves sketchy result decoding logic that was capable of mishandlin…
amokan Feb 20, 2026
74b9a5b
formatter fix
amokan Feb 20, 2026
c9a73f1
adds small nif to handle CityHash v1.0.2, which is required for Click…
amokan Feb 20, 2026
006b418
drops dedicated cityhash nif for more generic clickhouse compression …
amokan Feb 20, 2026
900f77e
add native compression handling for clickhouse
amokan Feb 20, 2026
3e1d970
fix encoding utils test
amokan Feb 20, 2026
dd993a5
integrates lz4 compression into native clickhouse implementation
amokan Feb 20, 2026
16d9612
native clickhouse inserts working
amokan Feb 22, 2026
71c8d81
adds retries and some general refactoring/removing of old logic/tests…
amokan Feb 23, 2026
b7dc901
Merge branch 'main' into adammokan/o11y-1444-basic-clickhouse-native-…
amokan Feb 23, 2026
149f342
adds additional warning level logging to native protocol inserts
amokan Feb 23, 2026
f9213ec
Merge branch 'adammokan/o11y-1444-basic-clickhouse-native-protocol-im…
amokan Feb 23, 2026
16410cb
tighten up `drain_trailing_packets/1`
amokan Feb 23, 2026
79c639a
Merge branch 'main' into adammokan/o11y-1444-basic-clickhouse-native-…
amokan Feb 23, 2026
16649f9
adds `insert_protocol` enum config to ClickHouse backend, replacing `…
amokan Feb 23, 2026
ee6b56c
remove block encoder benchmark script
amokan Feb 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"

members = ["native/arrowipc_ex", "native/sqlparser_ex", "native/mapper_ex"]
members = ["native/arrowipc_ex", "native/ch_compression_ex", "native/sqlparser_ex", "native/mapper_ex"]

[profile.release]
opt-level = 3
Expand Down
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ config :logflare, :bigquery_backend_adaptor, managed_service_account_pool_size:

config :logflare, :clickhouse_backend_adaptor,
engine: "MergeTree",
pool_size: 3
pool_size: 3,
native_pool_size: 10

config :logflare, Logflare.Sources.Source.BigQuery.Schema, updates_per_minute: 6

Expand Down
174 changes: 130 additions & 44 deletions lib/logflare/backends/adaptor/clickhouse_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do

alias __MODULE__.ConnectionManager
alias __MODULE__.Ingester
alias __MODULE__.NativeIngester
alias __MODULE__.NativeIngester.PoolSup, as: NativePoolSup
alias __MODULE__.Pipeline
alias __MODULE__.Provisioner
alias __MODULE__.QueryTemplates
Expand Down Expand Up @@ -111,7 +113,7 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
@doc false
@impl Logflare.Backends.Adaptor
def cast_config(%{} = params) do
{%{},
{%{insert_protocol: "http"},
%{
url: :string,
username: :string,
Expand All @@ -120,7 +122,10 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
port: :integer,
pool_size: :integer,
async_insert: :boolean,
read_only_url: :string
read_only_url: :string,
insert_protocol: :string,
native_port: :integer,
native_pool_size: :integer
}}
|> Changeset.cast(params, [
:url,
Expand All @@ -130,7 +135,10 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
:port,
:pool_size,
:async_insert,
:read_only_url
:read_only_url,
:insert_protocol,
:native_port,
:native_pool_size
])
|> Logflare.Utils.default_field_value(:async_insert, false)
end
Expand All @@ -140,11 +148,22 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
def validate_config(%Changeset{} = changeset) do
import Ecto.Changeset

{min_pool, max_pool} = NativeIngester.Pool.pool_size_range()

changeset
|> validate_required([:url, :database, :port])
|> Changeset.validate_format(:url, ~r/https?\:\/\/.+/)
|> validate_read_only_url()
|> validate_user_pass()
|> validate_inclusion(:insert_protocol, ["http", "native"])
|> validate_number(:pool_size,
greater_than_or_equal_to: 1,
less_than_or_equal_to: max_pool
)
|> validate_number(:native_pool_size,
greater_than_or_equal_to: min_pool,
less_than_or_equal_to: max_pool
)
end

@doc """
Expand Down Expand Up @@ -225,9 +244,9 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
with :ok <- ensure_query_connection_manager_started(backend) do
pool_via = connection_pool_via(backend)

case Ch.query(pool_via, statement, params, opts) do
case Ch.query(pool_via, statement, params, Keyword.put(opts, :decode, false)) do
{:ok, %Ch.Result{} = result} ->
{:ok, convert_ch_result_to_rows(result)}
{:ok, decode_ch_result(result)}

{:error, %Ch.Error{message: error_msg}} when is_non_empty_binary(error_msg) ->
Logger.warning(
Expand Down Expand Up @@ -258,18 +277,48 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
:ok | {:error, String.t()}
def insert_log_events(%Backend{}, [], _event_type), do: :ok

def insert_log_events(
%Backend{config: %{insert_protocol: "native"}} = backend,
[%LogEvent{} | _] = events,
event_type
)
when is_event_type(event_type) do
with :ok <- NativePoolSup.ensure_started(backend) do
do_insert_log_events(backend, events, event_type, :native)
end
end

def insert_log_events(%Backend{} = backend, [%LogEvent{} | _] = events, event_type)
when is_event_type(event_type) do
do_insert_log_events(backend, events, event_type, :http)
end

@spec do_insert_log_events(
Backend.t(),
[LogEvent.t()],
TypeDetection.event_type(),
:http | :native
) :: :ok | {:error, String.t()}
defp do_insert_log_events(backend, events, event_type, protocol) do
Logger.metadata(backend_id: backend.id)

table_name = clickhouse_ingest_table_name(backend, event_type)

case Ingester.insert(backend, table_name, events, event_type) do
result =
if protocol == :native do
NativeIngester.insert(backend, table_name, events, event_type)
else
Ingester.insert(backend, table_name, events, event_type)
end

case result do
:ok ->
:ok

{:error, reason} ->
Logger.error("ClickHouse insert errors.", error_string: inspect(reason))
Logger.error("ClickHouse #{protocol} insert error.",
error_string: inspect(reason)
)

{:error, reason}
end
Expand Down Expand Up @@ -386,35 +435,89 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
end
end

@spec convert_ch_result_to_rows(Ch.Result.t()) :: [map()]
defp convert_ch_result_to_rows(%Ch.Result{} = result) do
case {result.columns, result.rows} do
{nil, nil} ->
[]
@spec decode_ch_result(Ch.Result.t()) :: [map()]
defp decode_ch_result(%Ch.Result{} = result) do
format = get_response_header(result.headers, "x-clickhouse-format")

{nil, rows} when is_list(rows) ->
convert_uuids(rows)
case format do
"RowBinaryWithNamesAndTypes" ->
data = IO.iodata_to_binary(result.data)

{_columns, nil} ->
[]
{_names, types, _rest} = parse_row_binary_header(data)
[names | rows] = Ch.RowBinary.decode_names_and_rows(data)

{columns, rows} when is_list(columns) and is_list(rows) ->
for row <- rows do
columns
|> Enum.zip(row)
|> Map.new()
end
|> convert_uuids()
uuid_indices = uuid_column_indices(types)
rows = convert_uuid_values(rows, uuid_indices)

{columns, rows} ->
Logger.warning(
"Unexpected ClickHouse result format: columns=#{inspect(columns)}, rows=#{inspect(rows)}"
)
Enum.map(rows, fn row ->
names |> Enum.zip(row) |> Map.new()
end)

_ ->
[]
end
end

@spec get_response_header([{String.t(), String.t()}], String.t()) :: String.t() | nil
defp get_response_header(headers, name) when is_list(headers) do
Enum.find_value(headers, fn {k, v} -> if k == name, do: v end)
end

@spec parse_row_binary_header(binary()) :: {[String.t()], [String.t()], binary()}
defp parse_row_binary_header(data) do
{num_cols, rest} = decode_varuint(data)
{names, rest} = decode_n_strings(rest, num_cols, [])
{types, rest} = decode_n_strings(rest, num_cols, [])
{names, types, rest}
end

defp decode_n_strings(data, 0, acc), do: {Enum.reverse(acc), data}

defp decode_n_strings(data, n, acc) do
{string, rest} = decode_lp_string(data)
decode_n_strings(rest, n - 1, [string | acc])
end

defp decode_varuint(<<0::1, byte::7, rest::bytes>>), do: {byte, rest}

defp decode_varuint(<<1::1, byte::7, rest::bytes>>) do
{value, rest} = decode_varuint(rest)
{byte + Bitwise.bsl(value, 7), rest}
end

defp decode_lp_string(data) do
{len, rest} = decode_varuint(data)
<<string::binary-size(len), rest::bytes>> = rest
{string, rest}
end

@spec uuid_column_indices([String.t()]) :: MapSet.t(non_neg_integer())
defp uuid_column_indices(type_strings) do
type_strings
|> Enum.with_index()
|> Enum.filter(fn {type, _idx} -> String.contains?(type, "UUID") end)
|> Enum.map(fn {_type, idx} -> idx end)
|> MapSet.new()
end

defp convert_uuid_values(rows, uuid_indices) when map_size(uuid_indices) == 0, do: rows

defp convert_uuid_values(rows, uuid_indices),
do: Enum.map(rows, &convert_row_uuids(&1, uuid_indices))

defp convert_row_uuids(row, uuid_indices) do
row
|> Enum.with_index()
|> Enum.map(fn {value, idx} ->
if MapSet.member?(uuid_indices, idx), do: cast_uuid_value(value), else: value
end)
end

defp cast_uuid_value(nil), do: nil
defp cast_uuid_value(values) when is_list(values), do: Enum.map(values, &cast_uuid_value/1)
defp cast_uuid_value(<<_::128>> = bin), do: Ecto.UUID.cast!(bin)
defp cast_uuid_value(other), do: other

@spec execute_query_with_params(
Backend.t(),
query_string :: String.t(),
Expand Down Expand Up @@ -452,23 +555,6 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
end)
end

@spec convert_uuids(data :: any()) :: any()
defp convert_uuids(data) when is_struct(data), do: data

defp convert_uuids(data) when is_map(data) do
Map.new(data, fn {k, v} -> {k, convert_uuids(v)} end)
end

defp convert_uuids(data) when is_list(data) do
Enum.map(data, &convert_uuids/1)
end

defp convert_uuids(data) when is_non_empty_binary(data) and byte_size(data) == 16 do
Ecto.UUID.cast!(data)
end

defp convert_uuids(data), do: data

@spec ensure_query_connection_manager_started(Backend.t()) :: :ok | {:error, term()}
defp ensure_query_connection_manager_started(%Backend{id: backend_id} = backend) do
via = Backends.via_backend(backend, ConnectionManager)
Expand Down
23 changes: 23 additions & 0 deletions lib/logflare/backends/adaptor/clickhouse_adaptor/encoding_utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.EncodingUtils do
@moduledoc false

@spec sanitize_for_json(term()) :: Jason.Encoder.t()
def sanitize_for_json(value) when is_map(value) do
Map.new(value, fn {k, v} -> {k, sanitize_for_json(v)} end)
end

def sanitize_for_json(value) when is_list(value) do
Enum.map(value, &sanitize_for_json/1)
end

def sanitize_for_json(value) when is_tuple(value) do
value |> Tuple.to_list() |> Enum.map(&sanitize_for_json/1)
end

def sanitize_for_json(value)
when is_port(value) or is_pid(value) or is_reference(value) or is_function(value) do
inspect(value)
end

def sanitize_for_json(value), do: value
end
28 changes: 2 additions & 26 deletions lib/logflare/backends/adaptor/clickhouse_adaptor/ingester.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.Ingester do

import Logflare.Utils.Guards

alias Logflare.Backends.Adaptor.ClickHouseAdaptor.QueryTemplates
alias Logflare.Backends.Adaptor.ClickHouseAdaptor.RowBinaryEncoder
alias Logflare.Backends.Backend
alias Logflare.LogEvent
Expand All @@ -17,28 +18,6 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.Ingester do
@pool_timeout 8_000
@receive_timeout 20_000

@log_columns ~w(id source_uuid source_name project trace_id span_id trace_flags
severity_text severity_number service_name event_message scope_name scope_version
scope_schema_url resource_schema_url resource_attributes scope_attributes
log_attributes mapping_config_id timestamp)

@metric_columns ~w(id source_uuid source_name project time_unix start_time_unix
metric_name metric_description metric_unit metric_type service_name event_message
scope_name scope_version scope_schema_url resource_schema_url resource_attributes
scope_attributes attributes aggregation_temporality is_monotonic flags value count
sum min max scale zero_count positive_offset negative_offset
bucket_counts explicit_bounds positive_bucket_counts negative_bucket_counts
quantile_values quantiles exemplars.filtered_attributes exemplars.time_unix
exemplars.value exemplars.span_id exemplars.trace_id
mapping_config_id timestamp)

@trace_columns ~w(id source_uuid source_name project trace_id span_id
parent_span_id trace_state span_name span_kind service_name event_message duration
status_code status_message scope_name scope_version resource_attributes span_attributes
events.timestamp events.name events.attributes
links.trace_id links.span_id links.trace_state links.attributes
mapping_config_id timestamp)

@doc """
Inserts a list of `LogEvent` structs into ClickHouse.

Expand Down Expand Up @@ -133,10 +112,7 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.Ingester do
end

@doc false
@spec columns_for_type(TypeDetection.event_type()) :: [String.t()]
def columns_for_type(:log), do: @log_columns
def columns_for_type(:metric), do: @metric_columns
def columns_for_type(:trace), do: @trace_columns
defdelegate columns_for_type(event_type), to: QueryTemplates

@spec encode_log_row(LogEvent.t()) :: iodata()
defp encode_log_row(%LogEvent{
Expand Down
Loading
Loading