Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,57 @@ defmodule Electric.Replication.Eval.Env.KnownFunctions do
defpostgres("like(text, text) -> bool", delegate: &Casting.like?/2)
defpostgres("ilike(text, text) -> bool", delegate: &Casting.ilike?/2)

# COALESCE is non-strict: it should return the first non-NULL argument.
# We register arities 2..10 to support subset filters compiled by clients.
defp first_non_nil([head | rest]) when is_nil(head), do: first_non_nil(rest)
defp first_non_nil([head | _rest]), do: head
defp first_non_nil([]), do: nil

defpostgres "coalesce(anycompatible, anycompatible) -> anycompatible", strict?: false do
def coalesce2(a, b), do: first_non_nil([a, b])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce3(a, b, c), do: first_non_nil([a, b, c])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce4(a, b, c, d), do: first_non_nil([a, b, c, d])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce5(a, b, c, d, e), do: first_non_nil([a, b, c, d, e])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce6(a, b, c, d, e, f), do: first_non_nil([a, b, c, d, e, f])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce7(a, b, c, d, e, f, g), do: first_non_nil([a, b, c, d, e, f, g])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce8(a, b, c, d, e, f, g, h), do: first_non_nil([a, b, c, d, e, f, g, h])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce9(a, b, c, d, e, f, g, h, i), do: first_non_nil([a, b, c, d, e, f, g, h, i])
end

defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible",
strict?: false do
def coalesce10(a, b, c, d, e, f, g, h, i, j),
do: first_non_nil([a, b, c, d, e, f, g, h, i, j])
end

## Date functions
defpostgres("date + int8 -> date", commutative?: true, delegate: &Date.add/2)
defpostgres("date - date -> int8", delegate: &Date.diff/2)
Expand Down
59 changes: 35 additions & 24 deletions packages/sync-service/lib/electric/replication/eval/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -667,26 +667,17 @@ defmodule Electric.Replication.Eval.Parser do
_,
%{env: env}
) do
with {:ok, choices} <- find_available_functions(call, env),
{:ok, concrete} <- Lookups.pick_concrete_function_overload(choices, args, env),
{:ok, args} <- cast_unknowns(args, concrete.args, env),
{:ok, args} <- cast_implicit(args, concrete.args, env) do
{:ok, from_concrete(concrete, args)}
else
{:error, {_loc, _msg}} = error ->
error

:error ->
arg_list =
Enum.map_join(args, ", ", fn
%UnknownConst{} -> "unknown"
%{type: type} -> to_string(type)
end)
resolve_function_call(identifier(call.funcname), args, call.location, env)
end

{:error,
{call.location,
"Could not select a function overload for #{identifier(call.funcname)}(#{arg_list})"}}
end
defp node_to_ast(
%PgQuery.CoalesceExpr{args: raw_args, location: location},
children,
_,
%{env: env}
) do
args = Map.get(children, :args, raw_args)
resolve_function_call("coalesce", args, location, env)
end

# Next block of overloads matches on `A_Expr`, which is any operator call, as well as special syntax calls (e.g. `BETWEEN` or `ANY`).
Expand Down Expand Up @@ -1079,13 +1070,33 @@ defmodule Electric.Replication.Eval.Parser do
end
end

defp find_available_functions(%PgQuery.FuncCall{} = call, %{funcs: funcs}) do
name = identifier(call.funcname)
arity = length(call.args)

defp find_available_functions(name, arity, funcs, location)
when is_binary(name) and is_integer(arity) and is_map(funcs) do
case Map.fetch(funcs, {name, arity}) do
{:ok, options} -> {:ok, options}
:error -> {:error, {call.location, "unknown or unsupported function #{name}/#{arity}"}}
:error -> {:error, {location, "unknown or unsupported function #{name}/#{arity}"}}
end
end

defp resolve_function_call(name, args, location, %{funcs: funcs} = env)
when is_binary(name) and is_list(args) do
with {:ok, choices} <- find_available_functions(name, length(args), funcs, location),
{:ok, concrete} <- Lookups.pick_concrete_function_overload(choices, args, env),
{:ok, args} <- cast_unknowns(args, concrete.args, env),
{:ok, args} <- cast_implicit(args, concrete.args, env) do
{:ok, from_concrete(concrete, args)}
else
{:error, {_loc, _msg}} = error ->
error

:error ->
arg_list =
Enum.map_join(args, ", ", fn
%UnknownConst{} -> "unknown"
%{type: type} -> to_string(type)
end)

{:error, {location, "Could not select a function overload for #{name}(#{arg_list})"}}
end
end

Expand Down
4 changes: 4 additions & 0 deletions packages/sync-service/lib/electric/replication/eval/walker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ defimpl Electric.Walkable, for: PgQuery.FuncCall do
def children(%PgQuery.FuncCall{args: args}), do: [args: args]
end

defimpl Electric.Walkable, for: PgQuery.CoalesceExpr do
def children(%PgQuery.CoalesceExpr{args: args}), do: [args: args]
end

defimpl Electric.Walkable, for: PgQuery.A_Expr do
def children(%PgQuery.A_Expr{lexpr: lexpr, rexpr: rexpr, name: name}),
do: [lexpr: lexpr, rexpr: rexpr, name: name]
Expand Down
72 changes: 72 additions & 0 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3127,6 +3127,78 @@ defmodule Electric.Plug.RouterTest do
)
end

@tag with_sql: [
"CREATE TABLE nullable_items (id uuid primary key, value text)",
"INSERT INTO nullable_items VALUES (gen_random_uuid(), null)",
"INSERT INTO nullable_items VALUES (gen_random_uuid(), 'test value')"
]
test "subsets can filter with coalesce", ctx do
req = make_shape_req("nullable_items", log: "changes_only")

assert {_, 200,
%{
"metadata" => _,
"data" => [
%{
"value" => %{"id" => _, "value" => nil}
}
]
}} =
shape_req(req, ctx.opts,
subset: %{where: "coalesce(value, 'missing') = $1", params: %{"1" => "missing"}}
)
end

@tag with_sql: [
"CREATE TABLE nullable_items_arity_10 (id uuid primary key, value text)",
"INSERT INTO nullable_items_arity_10 VALUES (gen_random_uuid(), null)",
"INSERT INTO nullable_items_arity_10 VALUES (gen_random_uuid(), 'present')"
]
test "subsets can filter with coalesce arity 10", ctx do
req = make_shape_req("nullable_items_arity_10", log: "changes_only")

assert {_, 200,
%{
"metadata" => _,
"data" => [
%{
"value" => %{"id" => _, "value" => nil}
}
]
}} =
shape_req(req, ctx.opts,
subset: %{
where:
"coalesce(value, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'missing') = $1",
params: %{"1" => "missing"}
}
)
end

@tag with_sql: [
"CREATE TABLE nullable_items_arity_11 (id uuid primary key, value text)",
"INSERT INTO nullable_items_arity_11 VALUES (gen_random_uuid(), null)"
]
test "subsets return 400 for coalesce with more than 10 arguments", ctx do
req = make_shape_req("nullable_items_arity_11", log: "changes_only")

assert {_, 400, %{"errors" => %{"subset" => %{"where" => where_error}}}} =
shape_req(req, ctx.opts,
subset: %{
where:
"coalesce(value, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'missing') = 'missing'"
}
)

message =
case where_error do
msg when is_binary(msg) -> msg
msgs when is_list(msgs) -> Enum.join(msgs, " ")
end

assert message =~ "unknown or unsupported function coalesce/11"
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')",
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,32 @@ defmodule Electric.Replication.Eval.ParserTest do
assert %Func{name: "-", args: [%Ref{path: ["test"], type: :int4}]} = result
end

test "should correctly parse a coalesce call with arity 10" do
assert {:ok, %Expr{eval: %Func{strict?: false, type: :text, args: args}}} =
Parser.parse_and_validate_expression(
~S|coalesce("value", NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'fallback')|,
refs: %{["value"] => :text}
)

assert length(args) == 10
assert [%Ref{path: ["value"], type: :text} | _] = args
assert %Const{value: "fallback", type: :text} = List.last(args)
end

test "should reduce a constant coalesce call at parse time" do
assert {:ok, %Expr{eval: %Const{value: "fallback", type: :text}}} =
Parser.parse_and_validate_expression(~S|coalesce(NULL, 'fallback')|)
end

test "should return helpful error for coalesce with more than 10 arguments" do
assert {:error, message} =
Parser.parse_and_validate_expression(
~S|coalesce(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'fallback')|
)

assert message =~ "unknown or unsupported function coalesce/11"
end

test "should reduce down immutable function calls that have only constants" do
env =
Env.empty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ defmodule Electric.Replication.Eval.RunnerTest do
|> Runner.execute(%{["test"] => 1})
end

test "should evaluate coalesce with arity 10" do
expr =
~S|coalesce("v1", NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'fallback')|
|> Parser.parse_and_validate_expression!(refs: %{["v1"] => :text})

assert {:ok, "fallback"} = Runner.execute(expr, %{["v1"] => nil})
assert {:ok, "value"} = Runner.execute(expr, %{["v1"] => "value"})
end

test "should return nil for coalesce when all 10 arguments are NULL" do
assert {:ok, nil} =
~S|coalesce(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)|
|> Parser.parse_and_validate_expression!()
|> Runner.execute(%{})
end

test "should not apply strict functions to nil values" do
assert {:ok, nil} =
~S|"test" + 1|
Expand Down