+SELECT 500, 500 thousand, and 500 million rows (original)
+
+
+$ MIX_ENV=bench mix run bench/stream.exs
+
+This benchmark is based on https://github.com/ClickHouse/ch-bench
+
+Operating System: macOS
+CPU Information: Apple M1
+Number of Available Cores: 8
+Available memory: 8 GB
+Elixir 1.14.4
+Erlang 25.3
+
+Benchmark suite executing with the following configuration:
+warmup: 2 s
+time: 5 s
+memory time: 0 ns
+reduction time: 0 ns
+parallel: 1
+inputs: 500 rows, 500_000 rows, 500_000_000 rows
+Estimated total run time: 1.05 min
+
+Benchmarking stream with decode with input 500 rows ...
+Benchmarking stream with decode with input 500_000 rows ...
+Benchmarking stream with decode with input 500_000_000 rows ...
+Benchmarking stream with manual decode with input 500 rows ...
+Benchmarking stream with manual decode with input 500_000 rows ...
+Benchmarking stream with manual decode with input 500_000_000 rows ...
+Benchmarking stream without decode with input 500 rows ...
+Benchmarking stream without decode with input 500_000 rows ...
+Benchmarking stream without decode with input 500_000_000 rows ...
+
+##### With input 500 rows #####
+Name ips average deviation median 99th %
+stream with decode 4.69 K 213.34 μs ±12.49% 211.38 μs 290.94 μs
+stream with manual decode 4.69 K 213.43 μs ±17.40% 210.96 μs 298.75 μs
+stream without decode 4.65 K 215.08 μs ±10.79% 213.79 μs 284.66 μs
+
+Comparison:
+stream with decode 4.69 K
+stream with manual decode 4.69 K - 1.00x slower +0.0838 μs
+stream without decode 4.65 K - 1.01x slower +1.74 μs
+
+##### With input 500_000 rows #####
+Name ips average deviation median 99th %
+stream without decode 234.58 4.26 ms ±13.99% 4.04 ms 5.95 ms
+stream with manual decode 64.26 15.56 ms ±8.36% 15.86 ms 17.97 ms
+stream with decode 41.03 24.37 ms ±6.27% 24.39 ms 26.60 ms
+
+Comparison:
+stream without decode 234.58
+stream with manual decode 64.26 - 3.65x slower +11.30 ms
+stream with decode 41.03 - 5.72x slower +20.11 ms
+
+##### With input 500_000_000 rows #####
+Name ips average deviation median 99th %
+stream without decode 0.32 3.17 s ±0.20% 3.17 s 3.17 s
+stream with manual decode 0.0891 11.23 s ±0.00% 11.23 s 11.23 s
+stream with decode 0.0462 21.66 s ±0.00% 21.66 s 21.66 s
+
+Comparison:
+stream without decode 0.32
+stream with manual decode 0.0891 - 3.55x slower +8.06 s
+stream with decode 0.0462 - 6.84x slower +18.50 s
+
+
+
+
+[CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (click the latest workflow run and scroll down to "Artifacts")
diff --git a/ch/hex_metadata.config b/ch/hex_metadata.config
new file mode 100644
index 000000000000..b545328fc1d7
--- /dev/null
+++ b/ch/hex_metadata.config
@@ -0,0 +1,40 @@
+{<<"links">>,[{<<"GitHub">>,<<"https://github.com/plausible/ch">>}]}.
+{<<"name">>,<<"ch">>}.
+{<<"version">>,<<"0.2.6">>}.
+{<<"description">>,<<"HTTP ClickHouse driver for Elixir">>}.
+{<<"elixir">>,<<"~> 1.14">>}.
+{<<"app">>,<<"ch">>}.
+{<<"licenses">>,[<<"MIT">>]}.
+{<<"requirements">>,
+ [[{<<"name">>,<<"mint">>},
+ {<<"app">>,<<"mint">>},
+ {<<"optional">>,false},
+ {<<"requirement">>,<<"~> 1.0">>},
+ {<<"repository">>,<<"hexpm">>}],
+ [{<<"name">>,<<"db_connection">>},
+ {<<"app">>,<<"db_connection">>},
+ {<<"optional">>,false},
+ {<<"requirement">>,<<"~> 2.0">>},
+ {<<"repository">>,<<"hexpm">>}],
+ [{<<"name">>,<<"jason">>},
+ {<<"app">>,<<"jason">>},
+ {<<"optional">>,false},
+ {<<"requirement">>,<<"~> 1.0">>},
+ {<<"repository">>,<<"hexpm">>}],
+ [{<<"name">>,<<"decimal">>},
+ {<<"app">>,<<"decimal">>},
+ {<<"optional">>,false},
+ {<<"requirement">>,<<"~> 2.0">>},
+ {<<"repository">>,<<"hexpm">>}],
+ [{<<"name">>,<<"ecto">>},
+ {<<"app">>,<<"ecto">>},
+ {<<"optional">>,true},
+ {<<"requirement">>,<<"~> 3.5">>},
+ {<<"repository">>,<<"hexpm">>}]]}.
+{<<"files">>,
+ [<<"lib">>,<<"lib/ch.ex">>,<<"lib/ch">>,<<"lib/ch/stream.ex">>,
+ <<"lib/ch/types.ex">>,<<"lib/ch/error.ex">>,<<"lib/ch/row_binary.ex">>,
+ <<"lib/ch/query.ex">>,<<"lib/ch/result.ex">>,<<"lib/ch/connection.ex">>,
+ <<".formatter.exs">>,<<"mix.exs">>,<<"README.md">>,<<"LICENSE">>,
+ <<"CHANGELOG.md">>]}.
+{<<"build_tools">>,[<<"mix">>]}.
diff --git a/ch/lib/ch.ex b/ch/lib/ch.ex
new file mode 100644
index 000000000000..2e71c59ba0b2
--- /dev/null
+++ b/ch/lib/ch.ex
@@ -0,0 +1,287 @@
+defmodule Ch do
+ @moduledoc "Minimal HTTP ClickHouse client."
+ alias Ch.{Connection, Query, Result}
+
+ @type common_option ::
+ {:database, String.t()}
+ | {:username, String.t()}
+ | {:password, String.t()}
+ | {:settings, Keyword.t()}
+ | {:timeout, timeout}
+
+ @type start_option ::
+ common_option
+ | {:scheme, String.t()}
+ | {:hostname, String.t()}
+ | {:port, :inet.port_number()}
+ | {:transport_opts, :gen_tcp.connect_option()}
+ | DBConnection.start_option()
+
+ @doc """
+ Start the connection process and connect to ClickHouse.
+
+ ## Options
+
+ * `:scheme` - HTTP scheme, defaults to `"http"`
+ * `:hostname` - server hostname, defaults to `"localhost"`
+ * `:port` - HTTP port, defualts to `8123`
+ * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
+ * `:database` - Database, defaults to `"default"`
+ * `:username` - Username
+ * `:password` - User password
+ * `:settings` - Keyword list of ClickHouse settings
+ * `:timeout` - HTTP receive timeout in milliseconds
+ * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
+ * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
+
+ """
+ @spec start_link([start_option]) :: GenServer.on_start()
+ def start_link(opts \\ []) do
+ DBConnection.start_link(Connection, opts)
+ end
+
+ @doc """
+ Returns a supervisor child specification for a DBConnection pool.
+
+ See `start_link/1` for supported options.
+ """
+ @spec child_spec([start_option]) :: :supervisor.child_spec()
+ def child_spec(opts) do
+ DBConnection.child_spec(Connection, opts)
+ end
+
+ @type query_option ::
+ common_option
+ | {:command, Ch.Query.command()}
+ | {:headers, [{String.t(), String.t()}]}
+ | {:format, String.t()}
+ # TODO remove
+ | {:encode, boolean}
+ | {:decode, boolean}
+ | DBConnection.connection_option()
+
+ @doc """
+ Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
+ `{:error, Exception.t()}` if there was a database error.
+
+ ## Options
+
+ * `:database` - Database
+ * `:username` - Username
+ * `:password` - User password
+ * `:settings` - Keyword list of settings
+ * `:timeout` - Query request timeout
+ * `:command` - Command tag for the query
+ * `:headers` - Custom HTTP headers for the request
+ * `:format` - Custom response format for the request
+ * `:decode` - Whether to automatically decode the response
+ * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
+
+ """
+ @spec query(DBConnection.conn(), iodata, params, [query_option]) ::
+ {:ok, Result.t()} | {:error, Exception.t()}
+ when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
+ def query(conn, statement, params \\ [], opts \\ []) do
+ query = Query.build(statement, opts)
+
+ with {:ok, _query, result} <- DBConnection.execute(conn, query, params, opts) do
+ {:ok, result}
+ end
+ end
+
+ @doc """
+ Runs a query and returns the result or raises `Ch.Error` if
+ there was an error. See `query/4`.
+ """
+ @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t()
+ when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
+ def query!(conn, statement, params \\ [], opts \\ []) do
+ query = Query.build(statement, opts)
+ DBConnection.execute!(conn, query, params, opts)
+ end
+
+ @doc false
+ @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t()
+ def stream(conn, statement, params \\ [], opts \\ []) do
+ query = Query.build(statement, opts)
+ %Ch.Stream{conn: conn, query: query, params: params, opts: opts}
+ end
+
+ # TODO drop
+ @doc false
+ @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
+ def run(conn, f, opts \\ []) when is_function(f, 1) do
+ DBConnection.run(conn, f, opts)
+ end
+
+ if Code.ensure_loaded?(Ecto.ParameterizedType) do
+ @behaviour Ecto.ParameterizedType
+
+ @impl true
+ def type(params), do: {:parameterized, Ch, params}
+
+ @impl true
+ def init(opts) do
+ clickhouse_type =
+ opts[:raw] || opts[:type] ||
+ raise ArgumentError, "keys :raw or :type not found in: #{inspect(opts)}"
+
+ Ch.Types.decode(clickhouse_type)
+ end
+
+ @impl true
+ def load(value, _loader, _params), do: {:ok, value}
+
+ @impl true
+ def dump(value, _dumper, _params), do: {:ok, value}
+
+ @impl true
+ def cast(value, :string = type), do: Ecto.Type.cast(type, value)
+ def cast(value, :boolean = type), do: Ecto.Type.cast(type, value)
+ def cast(value, :uuid), do: Ecto.Type.cast(Ecto.UUID, value)
+ def cast(value, :date = type), do: Ecto.Type.cast(type, value)
+ def cast(value, :date32), do: Ecto.Type.cast(:date, value)
+ def cast(value, :datetime), do: Ecto.Type.cast(:naive_datetime, value)
+ def cast(value, {:datetime, "UTC"}), do: Ecto.Type.cast(:utc_datetime, value)
+ def cast(value, {:datetime64, _p}), do: Ecto.Type.cast(:naive_datetime_usec, value)
+ def cast(value, {:datetime64, _p, "UTC"}), do: Ecto.Type.cast(:utc_datetime_usec, value)
+ def cast(value, {:fixed_string, _s}), do: Ecto.Type.cast(:string, value)
+
+ for size <- [8, 16, 32, 64, 128, 256] do
+ def cast(value, unquote(:"i#{size}")), do: Ecto.Type.cast(:integer, value)
+ def cast(value, unquote(:"u#{size}")), do: Ecto.Type.cast(:integer, value)
+ end
+
+ for size <- [32, 64] do
+ def cast(value, unquote(:"f#{size}")), do: Ecto.Type.cast(:float, value)
+ end
+
+ def cast(value, {:decimal = type, _p, _s}), do: Ecto.Type.cast(type, value)
+
+ for size <- [32, 64, 128, 256] do
+ def cast(value, {unquote(:"decimal#{size}"), _s}) do
+ Ecto.Type.cast(:decimal, value)
+ end
+ end
+
+ def cast(value, {:array, type}), do: Ecto.Type.cast({:array, type(type)}, value)
+ def cast(value, {:nullable, type}), do: cast(value, type)
+ def cast(value, {:low_cardinality, type}), do: cast(value, type)
+ def cast(value, {:simple_aggregate_function, _name, type}), do: cast(value, type)
+
+ def cast(value, :ring), do: Ecto.Type.cast({:array, type(:point)}, value)
+ def cast(value, :polygon), do: Ecto.Type.cast({:array, type(:ring)}, value)
+ def cast(value, :multipolygon), do: Ecto.Type.cast({:array, type(:polygon)}, value)
+
+ def cast(nil, _params), do: {:ok, nil}
+
+ def cast(value, {enum, mappings}) when enum in [:enum8, :enum16] do
+ result =
+ case value do
+ _ when is_integer(value) -> List.keyfind(mappings, value, 1, :error)
+ _ when is_binary(value) -> List.keyfind(mappings, value, 0, :error)
+ _ -> :error
+ end
+
+ case result do
+ {_, _} -> {:ok, value}
+ :error = e -> e
+ end
+ end
+
+ def cast(value, :ipv4) do
+ case value do
+ {a, b, c, d} when is_number(a) and is_number(b) and is_number(c) and is_number(d) ->
+ {:ok, value}
+
+ _ when is_binary(value) ->
+ with {:error = e, _reason} <- :inet.parse_ipv4_address(to_charlist(value)), do: e
+
+ _ when is_list(value) ->
+ with {:error = e, _reason} <- :inet.parse_ipv4_address(value), do: e
+
+ _ ->
+ :error
+ end
+ end
+
+ def cast(value, :ipv6) do
+ case value do
+ {a, s, d, f, g, h, j, k}
+ when is_number(a) and is_number(s) and is_number(d) and is_number(f) and
+ is_number(g) and is_number(h) and is_number(j) and is_number(k) ->
+ {:ok, value}
+
+ _ when is_binary(value) ->
+ with {:error = e, _reason} <- :inet.parse_ipv6_address(to_charlist(value)), do: e
+
+ _ when is_list(value) ->
+ with {:error = e, _reason} <- :inet.parse_ipv6_address(value), do: e
+
+ _ ->
+ :error
+ end
+ end
+
+ def cast(value, :point) do
+ case value do
+ {x, y} when is_number(x) and is_number(y) -> {:ok, value}
+ _ -> :error
+ end
+ end
+
+ def cast(value, {:tuple, types}), do: cast_tuple(types, value)
+ def cast(value, {:map, key_type, value_type}), do: cast_map(value, key_type, value_type)
+
+ defp cast_tuple(types, values) when is_tuple(values) do
+ cast_tuple(types, Tuple.to_list(values), [])
+ end
+
+ defp cast_tuple(types, values) when is_list(values) do
+ cast_tuple(types, values, [])
+ end
+
+ defp cast_tuple(_types, _values), do: :error
+
+ defp cast_tuple([type | types], [value | values], acc) do
+ case cast(value, type) do
+ {:ok, value} -> cast_tuple(types, values, [value | acc])
+ :error = e -> e
+ end
+ end
+
+ defp cast_tuple([], [], acc), do: {:ok, List.to_tuple(:lists.reverse(acc))}
+ defp cast_tuple(_types, _values, _acc), do: :error
+
+ defp cast_map(value, key_type, value_type) when is_map(value) do
+ cast_map(Map.to_list(value), key_type, value_type)
+ end
+
+ defp cast_map(value, key_type, value_type) when is_list(value) do
+ cast_map(value, key_type, value_type, [])
+ end
+
+ defp cast_map(_value, _key_type, _value_type), do: :error
+
+ defp cast_map([{key, value} | kvs], key_type, value_type, acc) do
+ with {:ok, key} <- cast(key, key_type),
+ {:ok, value} <- cast(value, value_type) do
+ cast_map(kvs, key_type, value_type, [{key, value} | acc])
+ end
+ end
+
+ defp cast_map([], _key_type, _value_type, acc), do: {:ok, Map.new(acc)}
+ defp cast_map(_kvs, _key_type, _value_type, _acc), do: :error
+
+ @impl true
+ def embed_as(_, _), do: :self
+
+ @impl true
+ def equal?(a, b, _), do: a == b
+
+ @impl true
+ def format(params) do
+ "#Ch<#{Ch.Types.encode(params)}>"
+ end
+ end
+end
diff --git a/ch/lib/ch/connection.ex b/ch/lib/ch/connection.ex
new file mode 100644
index 000000000000..3742f4747862
--- /dev/null
+++ b/ch/lib/ch/connection.ex
@@ -0,0 +1,427 @@
+defmodule Ch.Connection do
+ @moduledoc false
+ use DBConnection
+ require Logger
+ alias Ch.{Error, Query, Result}
+ alias Mint.HTTP1, as: HTTP
+
+ @user_agent "ch/" <> Mix.Project.config()[:version]
+
+ @typep conn :: HTTP.t()
+
+ @impl true
+ @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()}
+ def connect(opts) do
+ scheme = String.to_existing_atom(opts[:scheme] || "http")
+ address = opts[:hostname] || "localhost"
+ port = opts[:port] || 8123
+ mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts])
+
+ with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
+ conn =
+ conn
+ |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
+ |> maybe_put_private(:database, opts[:database])
+ |> maybe_put_private(:username, opts[:username])
+ |> maybe_put_private(:password, opts[:password])
+ |> maybe_put_private(:settings, opts[:settings])
+
+ handshake = Query.build("select 1")
+ params = DBConnection.Query.encode(handshake, _params = [], _opts = [])
+
+ case handle_execute(handshake, params, _opts = [], conn) do
+ {:ok, handshake, responses, conn} ->
+ case DBConnection.Query.decode(handshake, responses, _opts = []) do
+ %Result{rows: [[1]]} ->
+ {:ok, conn}
+
+ result ->
+ {:ok, _conn} = HTTP.close(conn)
+ reason = Error.exception("unexpected result for '#{handshake}': #{inspect(result)}")
+ {:error, reason}
+ end
+
+ {:error, reason, conn} ->
+ {:ok, _conn} = HTTP.close(conn)
+ {:error, reason}
+
+ {:disconnect, reason, conn} ->
+ {:ok, _conn} = HTTP.close(conn)
+ {:error, reason}
+ end
+ end
+ end
+
+ @impl true
+ @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn}
+ def ping(conn) do
+ headers = [{"user-agent", @user_agent}]
+
+ case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do
+ {:ok, conn, _response} -> {:ok, conn}
+ {:error, error, conn} -> {:disconnect, error, conn}
+ {:disconnect, _error, _conn} = disconnect -> disconnect
+ end
+ end
+
+ @impl true
+ @spec checkout(conn) :: {:ok, conn}
+ def checkout(conn), do: {:ok, conn}
+
+ # we "support" these four tx callbacks for Repo.checkout
+ # even though ClickHouse doesn't support txs
+
+ @impl true
+ def handle_begin(_opts, conn), do: {:ok, %{}, conn}
+ @impl true
+ def handle_commit(_opts, conn), do: {:ok, %{}, conn}
+ @impl true
+ def handle_rollback(_opts, conn), do: {:ok, %{}, conn}
+ @impl true
+ def handle_status(_opts, conn), do: {:idle, conn}
+
+ @impl true
+ def handle_prepare(_query, _opts, conn) do
+ {:error, Error.exception("prepared statements are not supported"), conn}
+ end
+
+ @impl true
+ def handle_close(_query, _opts, conn) do
+ {:error, Error.exception("prepared statements are not supported"), conn}
+ end
+
+ @impl true
+ def handle_declare(query, params, opts, conn) do
+ %Query{command: command} = query
+ {query_params, extra_headers, body} = params
+
+ path = path(conn, query_params, opts)
+ headers = headers(conn, extra_headers, opts)
+
+ with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body),
+ {:ok, conn} <- eat_ok_status_and_headers(conn, timeout(conn, opts)) do
+ {:ok, query, %Result{command: command}, conn}
+ end
+ end
+
+ @spec eat_ok_status_and_headers(conn, timeout) ::
+ {:ok, %{conn: conn, buffer: [Mint.Types.response()]}}
+ | {:error, Ch.Error.t(), conn}
+ | {:disconnect, Mint.Types.error(), conn}
+ defp eat_ok_status_and_headers(conn, timeout) do
+ case HTTP.recv(conn, 0, timeout) do
+ {:ok, conn, responses} ->
+ case eat_ok_status_and_headers(responses) do
+ {:ok, data} ->
+ {:ok, %{conn: conn, buffer: data}}
+
+ :more ->
+ eat_ok_status_and_headers(conn, timeout)
+
+ :error ->
+ all_responses_result =
+ case handle_all_responses(responses, []) do
+ {:ok, responses} -> {:ok, conn, responses}
+ {:more, acc} -> recv_all(conn, acc, timeout)
+ end
+
+ with {:ok, conn, responses} <- all_responses_result do
+ [_status, headers | data] = responses
+ message = IO.iodata_to_binary(data)
+
+ code =
+ if code = get_header(headers, "x-clickhouse-exception-code") do
+ String.to_integer(code)
+ end
+
+ {:error, Error.exception(code: code, message: message), conn}
+ end
+ end
+
+ {:error, conn, error, _responses} ->
+ {:disconnect, error, conn}
+ end
+ end
+
+ defp eat_ok_status_and_headers([{:status, _ref, 200} | rest]) do
+ eat_ok_status_and_headers(rest)
+ end
+
+ defp eat_ok_status_and_headers([{:status, _ref, _status} | _rest]), do: :error
+ defp eat_ok_status_and_headers([{:headers, _ref, _headers} | data]), do: {:ok, data}
+ defp eat_ok_status_and_headers([]), do: :more
+
+ @impl true
+ def handle_fetch(query, result, opts, %{conn: conn, buffer: buffer}) do
+ case buffer do
+ [] -> handle_fetch(query, result, opts, conn)
+ _not_empty -> {halt_or_cont(buffer), %Result{result | data: extract_data(buffer)}, conn}
+ end
+ end
+
+ def handle_fetch(_query, result, opts, conn) do
+ case HTTP.recv(conn, 0, timeout(conn, opts)) do
+ {:ok, conn, responses} ->
+ {halt_or_cont(responses), %Result{result | data: extract_data(responses)}, conn}
+
+ {:error, conn, reason, _responses} ->
+ {:disconnect, reason, conn}
+ end
+ end
+
+ defp halt_or_cont([{:done, _ref}]), do: :halt
+ defp halt_or_cont([_ | rest]), do: halt_or_cont(rest)
+ defp halt_or_cont([]), do: :cont
+
+ defp extract_data([{:data, _ref, data} | rest]), do: [data | extract_data(rest)]
+ defp extract_data([] = empty), do: empty
+ defp extract_data([{:done, _ref}]), do: []
+
+ @impl true
+ def handle_deallocate(_query, result, _opts, conn) do
+ case HTTP.open_request_count(conn) do
+ 0 ->
+ # TODO data: [], anything else?
+ {:ok, %Result{result | data: []}, conn}
+
+ 1 ->
+ {:disconnect, Error.exception("cannot stop stream before receiving full response"), conn}
+ end
+ end
+
+ @impl true
+ def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do
+ {query_params, extra_headers, body} = params
+
+ path = path(conn, query_params, opts)
+ headers = headers(conn, extra_headers, opts)
+
+ with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do
+ case HTTP.stream_request_body(conn, ref, body) do
+ {:ok, conn} -> {:ok, query, ref, conn}
+ {:error, conn, reason} -> {:disconnect, reason, conn}
+ end
+ end
+ end
+
+ def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do
+ case HTTP.stream_request_body(conn, ref, body) do
+ {:ok, conn} ->
+ case body do
+ :eof ->
+ with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
+ {:ok, query, responses, conn}
+ end
+
+ _other ->
+ {:ok, query, ref, conn}
+ end
+
+ {:error, conn, reason} ->
+ {:disconnect, reason, conn}
+ end
+ end
+
+ def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
+ {query_params, extra_headers, body} = params
+
+ path = path(conn, query_params, opts)
+ headers = headers(conn, extra_headers, opts)
+
+ result =
+ if is_function(body, 2) do
+ request_chunked(conn, "POST", path, headers, body, opts)
+ else
+ request(conn, "POST", path, headers, body, opts)
+ end
+
+ with {:ok, conn, responses} <- result do
+ {:ok, query, responses, conn}
+ end
+ end
+
+ def handle_execute(query, params, opts, conn) do
+ {query_params, extra_headers, body} = params
+
+ path = path(conn, query_params, opts)
+ headers = headers(conn, extra_headers, opts)
+
+ with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
+ {:ok, query, responses, conn}
+ end
+ end
+
+ @impl true
+ def disconnect(_error, conn) do
+ {:ok = ok, _conn} = HTTP.close(conn)
+ ok
+ end
+
+ @typep response :: Mint.Types.status() | Mint.Types.headers() | binary
+
+ @spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) ::
+ {:ok, conn, [response]}
+ | {:error, Error.t(), conn}
+ | {:disconnect, Mint.Types.error(), conn}
+ defp request(conn, method, path, headers, body, opts) do
+ with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do
+ receive_full_response(conn, timeout(conn, opts))
+ end
+ end
+
+ @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
+ {:ok, conn, [response]}
+ | {:error, Error.t(), conn}
+ | {:disconnect, Mint.Types.error(), conn}
+ def request_chunked(conn, method, path, headers, stream, opts) do
+ with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
+ {:ok, conn} <- stream_body(conn, ref, stream),
+ do: receive_full_response(conn, timeout(conn, opts))
+ end
+
+ @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
+ {:ok, conn} | {:disconnect, Mint.Types.error(), conn}
+ defp stream_body(conn, ref, stream) do
+ result =
+ stream
+ |> Stream.concat([:eof])
+ |> Enum.reduce_while({:ok, conn}, fn
+ chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
+ _chunk, {:error, _conn, _reason} = error -> {:halt, error}
+ end)
+
+ case result do
+ {:ok, _conn} = ok -> ok
+ {:error, conn, reason} -> {:disconnect, reason, conn}
+ end
+ end
+
+ # stacktrace is a bit cleaner with this function inlined
+ @compile inline: [send_request: 5]
+ defp send_request(conn, method, path, headers, body) do
+ case HTTP.request(conn, method, path, headers, body) do
+ {:ok, _conn, _ref} = ok -> ok
+ {:error, conn, reason} -> {:disconnect, reason, conn}
+ end
+ end
+
+ @spec receive_full_response(conn, timeout) ::
+ {:ok, conn, [response]}
+ | {:error, Error.t(), conn}
+ | {:disconnect, Mint.Types.error(), conn}
+ defp receive_full_response(conn, timeout) do
+ with {:ok, conn, responses} <- recv_all(conn, [], timeout) do
+ case responses do
+ [200, headers | _rest] ->
+ conn = ensure_same_server(conn, headers)
+ {:ok, conn, responses}
+
+ [_status, headers | data] ->
+ message = IO.iodata_to_binary(data)
+
+ code =
+ if code = get_header(headers, "x-clickhouse-exception-code") do
+ String.to_integer(code)
+ end
+
+ {:error, Error.exception(code: code, message: message), conn}
+ end
+ end
+ end
+
+ @spec recv_all(conn, [response], timeout()) ::
+ {:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn}
+ defp recv_all(conn, acc, timeout) do
+ case HTTP.recv(conn, 0, timeout) do
+ {:ok, conn, responses} ->
+ case handle_all_responses(responses, acc) do
+ {:ok, responses} -> {:ok, conn, responses}
+ {:more, acc} -> recv_all(conn, acc, timeout)
+ end
+
+ {:error, conn, reason, _responses} ->
+ {:disconnect, reason, conn}
+ end
+ end
+
+ for tag <- [:data, :status, :headers] do
+ defp handle_all_responses([{unquote(tag), _ref, data} | rest], acc) do
+ handle_all_responses(rest, [data | acc])
+ end
+ end
+
+ defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)}
+ defp handle_all_responses([], acc), do: {:more, acc}
+
+ defp maybe_put_private(conn, _k, nil), do: conn
+ defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v)
+
+ defp timeout(conn), do: HTTP.get_private(conn, :timeout)
+ defp timeout(conn, opts), do: Keyword.get(opts, :timeout) || timeout(conn)
+
+ defp settings(conn, opts) do
+ default_settings = HTTP.get_private(conn, :settings, [])
+ opts_settings = Keyword.get(opts, :settings, [])
+ Keyword.merge(default_settings, opts_settings)
+ end
+
+ defp headers(conn, extra_headers, opts) do
+ extra_headers
+ |> maybe_put_new_header("x-clickhouse-user", get_opts_or_private(conn, opts, :username))
+ |> maybe_put_new_header("x-clickhouse-key", get_opts_or_private(conn, opts, :password))
+ |> maybe_put_new_header("x-clickhouse-database", get_opts_or_private(conn, opts, :database))
+ |> maybe_put_new_header("user-agent", @user_agent)
+ end
+
+ defp get_opts_or_private(conn, opts, key) do
+ Keyword.get(opts, key) || HTTP.get_private(conn, key)
+ end
+
+ defp maybe_put_new_header(headers, _name, _no_value = nil), do: headers
+
+ defp maybe_put_new_header(headers, name, value) do
+ if List.keymember?(headers, name, 0) do
+ headers
+ else
+ [{name, value} | headers]
+ end
+ end
+
+ defp get_header(headers, key) do
+ case List.keyfind(headers, key, 0) do
+ {_, value} -> value
+ nil = not_found -> not_found
+ end
+ end
+
+ defp path(conn, query_params, opts) do
+ settings = settings(conn, opts)
+ "/?" <> URI.encode_query(settings ++ query_params)
+ end
+
+ @server_display_name_key :server_display_name
+
+ @spec ensure_same_server(conn, Mint.Types.headers()) :: conn
+ defp ensure_same_server(conn, headers) do
+ expected_name = HTTP.get_private(conn, @server_display_name_key)
+ actual_name = get_header(headers, "x-clickhouse-server-display-name")
+
+ cond do
+ expected_name && actual_name ->
+ unless actual_name == expected_name do
+ Logger.warning(
+ "Server mismatch detected. Expected #{inspect(expected_name)} but got #{inspect(actual_name)}!" <>
+ " Connection pooling might be unstable."
+ )
+ end
+
+ conn
+
+ actual_name ->
+ HTTP.put_private(conn, @server_display_name_key, actual_name)
+
+ true ->
+ conn
+ end
+ end
+end
diff --git a/ch/lib/ch/error.ex b/ch/lib/ch/error.ex
new file mode 100644
index 000000000000..9b427eefcbfd
--- /dev/null
+++ b/ch/lib/ch/error.ex
@@ -0,0 +1,5 @@
+defmodule Ch.Error do
+ @moduledoc "Error struct wrapping ClickHouse error responses."
+ defexception [:code, :message]
+ @type t :: %__MODULE__{code: pos_integer | nil, message: String.t()}
+end
diff --git a/ch/lib/ch/query.ex b/ch/lib/ch/query.ex
new file mode 100644
index 000000000000..c2b12ae4fdf1
--- /dev/null
+++ b/ch/lib/ch/query.ex
@@ -0,0 +1,307 @@
+defmodule Ch.Query do
+ @moduledoc "Query struct wrapping the SQL statement."
+ defstruct [:statement, :command, :encode, :decode]
+
+ @type t :: %__MODULE__{statement: iodata, command: command, encode: boolean, decode: boolean}
+
+ @doc false
+ @spec build(iodata, [Ch.query_option()]) :: t
+ def build(statement, opts \\ []) do
+ command = Keyword.get(opts, :command) || extract_command(statement)
+ encode = Keyword.get(opts, :encode, true)
+ decode = Keyword.get(opts, :decode, true)
+ %__MODULE__{statement: statement, command: command, encode: encode, decode: decode}
+ end
+
+ statements = [
+ {"SELECT", :select},
+ {"INSERT", :insert},
+ {"CREATE", :create},
+ {"ALTER", :alter},
+ {"DELETE", :delete},
+ {"SYSTEM", :system},
+ {"SHOW", :show},
+ # as of clickhouse 22.8, WITH is only allowed in SELECT
+ # https://clickhouse.com/docs/en/sql-reference/statements/select/with/
+ {"WITH", :select},
+ {"GRANT", :grant},
+ {"EXPLAIN", :explain},
+ {"REVOKE", :revoke},
+ {"ATTACH", :attach},
+ {"CHECK", :check},
+ {"DESCRIBE", :describe},
+ {"DETACH", :detach},
+ {"DROP", :drop},
+ {"EXISTS", :exists},
+ {"KILL", :kill},
+ {"OPTIMIZE", :optimize},
+ {"RENAME", :rename},
+ {"EXCHANGE", :exchange},
+ {"SET", :set},
+ {"TRUNCATE", :truncate},
+ {"USE", :use},
+ {"WATCH", :watch}
+ ]
+
+ command_union =
+ statements
+ |> Enum.map(fn {_, command} -> command end)
+ |> Enum.reduce(&{:|, [], [&1, &2]})
+
+ @type command :: unquote(command_union)
+
+ defp extract_command(statement)
+
+ for {statement, command} <- statements do
+ defp extract_command(unquote(statement) <> _), do: unquote(command)
+ defp extract_command(unquote(String.downcase(statement)) <> _), do: unquote(command)
+ end
+
+ defp extract_command(<