diff --git a/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex b/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex index cce29a038b..a4554e2087 100644 --- a/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex +++ b/packages/sync-service/lib/electric/replication/eval/env/known_functions.ex @@ -99,6 +99,57 @@ defmodule Electric.Replication.Eval.Env.KnownFunctions do defpostgres("like(text, text) -> bool", delegate: &Casting.like?/2) defpostgres("ilike(text, text) -> bool", delegate: &Casting.ilike?/2) + # COALESCE is non-strict: it should return the first non-NULL argument. + # We register arities 2..10 to support subset filters compiled by clients. + defp first_non_nil([head | rest]) when is_nil(head), do: first_non_nil(rest) + defp first_non_nil([head | _rest]), do: head + defp first_non_nil([]), do: nil + + defpostgres "coalesce(anycompatible, anycompatible) -> anycompatible", strict?: false do + def coalesce2(a, b), do: first_non_nil([a, b]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce3(a, b, c), do: first_non_nil([a, b, c]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce4(a, b, c, d), do: first_non_nil([a, b, c, d]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce5(a, b, c, d, e), do: first_non_nil([a, b, c, d, e]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce6(a, b, c, d, e, f), do: first_non_nil([a, b, c, d, e, f]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce7(a, b, c, d, e, f, g), do: first_non_nil([a, b, c, d, e, f, g]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce8(a, b, c, d, e, f, g, h), do: first_non_nil([a, b, c, d, e, f, g, h]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce9(a, b, c, d, e, f, g, h, i), do: first_non_nil([a, b, c, d, e, f, g, h, i]) + end + + defpostgres "coalesce(anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible, anycompatible) -> anycompatible", + strict?: false do + def coalesce10(a, b, c, d, e, f, g, h, i, j), + do: first_non_nil([a, b, c, d, e, f, g, h, i, j]) + end + ## Date functions defpostgres("date + int8 -> date", commutative?: true, delegate: &Date.add/2) defpostgres("date - date -> int8", delegate: &Date.diff/2) diff --git a/packages/sync-service/lib/electric/replication/eval/parser.ex b/packages/sync-service/lib/electric/replication/eval/parser.ex index c7e3d27585..af53f392e0 100644 --- a/packages/sync-service/lib/electric/replication/eval/parser.ex +++ b/packages/sync-service/lib/electric/replication/eval/parser.ex @@ -667,26 +667,17 @@ defmodule Electric.Replication.Eval.Parser do _, %{env: env} ) do - with {:ok, choices} <- find_available_functions(call, env), - {:ok, concrete} <- Lookups.pick_concrete_function_overload(choices, args, env), - {:ok, args} <- cast_unknowns(args, concrete.args, env), - {:ok, args} <- cast_implicit(args, concrete.args, env) do - {:ok, from_concrete(concrete, args)} - else - {:error, {_loc, _msg}} = error -> - error - - :error -> - arg_list = - Enum.map_join(args, ", ", fn - %UnknownConst{} -> "unknown" - %{type: type} -> to_string(type) - end) + resolve_function_call(identifier(call.funcname), args, call.location, env) + end - {:error, - {call.location, - "Could not select a function overload for #{identifier(call.funcname)}(#{arg_list})"}} - end + defp node_to_ast( + %PgQuery.CoalesceExpr{args: raw_args, location: location}, + children, + _, + %{env: env} + ) do + args = Map.get(children, :args, raw_args) + resolve_function_call("coalesce", args, location, env) end # Next block of overloads matches on `A_Expr`, which is any operator call, as well as special syntax calls (e.g. `BETWEEN` or `ANY`). @@ -1079,13 +1070,33 @@ defmodule Electric.Replication.Eval.Parser do end end - defp find_available_functions(%PgQuery.FuncCall{} = call, %{funcs: funcs}) do - name = identifier(call.funcname) - arity = length(call.args) - + defp find_available_functions(name, arity, funcs, location) + when is_binary(name) and is_integer(arity) and is_map(funcs) do case Map.fetch(funcs, {name, arity}) do {:ok, options} -> {:ok, options} - :error -> {:error, {call.location, "unknown or unsupported function #{name}/#{arity}"}} + :error -> {:error, {location, "unknown or unsupported function #{name}/#{arity}"}} + end + end + + defp resolve_function_call(name, args, location, %{funcs: funcs} = env) + when is_binary(name) and is_list(args) do + with {:ok, choices} <- find_available_functions(name, length(args), funcs, location), + {:ok, concrete} <- Lookups.pick_concrete_function_overload(choices, args, env), + {:ok, args} <- cast_unknowns(args, concrete.args, env), + {:ok, args} <- cast_implicit(args, concrete.args, env) do + {:ok, from_concrete(concrete, args)} + else + {:error, {_loc, _msg}} = error -> + error + + :error -> + arg_list = + Enum.map_join(args, ", ", fn + %UnknownConst{} -> "unknown" + %{type: type} -> to_string(type) + end) + + {:error, {location, "Could not select a function overload for #{name}(#{arg_list})"}} end end diff --git a/packages/sync-service/lib/electric/replication/eval/walker.ex b/packages/sync-service/lib/electric/replication/eval/walker.ex index 3a0daa9ee0..e1bedcf2dc 100644 --- a/packages/sync-service/lib/electric/replication/eval/walker.ex +++ b/packages/sync-service/lib/electric/replication/eval/walker.ex @@ -211,6 +211,10 @@ defimpl Electric.Walkable, for: PgQuery.FuncCall do def children(%PgQuery.FuncCall{args: args}), do: [args: args] end +defimpl Electric.Walkable, for: PgQuery.CoalesceExpr do + def children(%PgQuery.CoalesceExpr{args: args}), do: [args: args] +end + defimpl Electric.Walkable, for: PgQuery.A_Expr do def children(%PgQuery.A_Expr{lexpr: lexpr, rexpr: rexpr, name: name}), do: [lexpr: lexpr, rexpr: rexpr, name: name] diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index 55840e328f..01dbce4856 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -3127,6 +3127,78 @@ defmodule Electric.Plug.RouterTest do ) end + @tag with_sql: [ + "CREATE TABLE nullable_items (id uuid primary key, value text)", + "INSERT INTO nullable_items VALUES (gen_random_uuid(), null)", + "INSERT INTO nullable_items VALUES (gen_random_uuid(), 'test value')" + ] + test "subsets can filter with coalesce", ctx do + req = make_shape_req("nullable_items", log: "changes_only") + + assert {_, 200, + %{ + "metadata" => _, + "data" => [ + %{ + "value" => %{"id" => _, "value" => nil} + } + ] + }} = + shape_req(req, ctx.opts, + subset: %{where: "coalesce(value, 'missing') = $1", params: %{"1" => "missing"}} + ) + end + + @tag with_sql: [ + "CREATE TABLE nullable_items_arity_10 (id uuid primary key, value text)", + "INSERT INTO nullable_items_arity_10 VALUES (gen_random_uuid(), null)", + "INSERT INTO nullable_items_arity_10 VALUES (gen_random_uuid(), 'present')" + ] + test "subsets can filter with coalesce arity 10", ctx do + req = make_shape_req("nullable_items_arity_10", log: "changes_only") + + assert {_, 200, + %{ + "metadata" => _, + "data" => [ + %{ + "value" => %{"id" => _, "value" => nil} + } + ] + }} = + shape_req(req, ctx.opts, + subset: %{ + where: + "coalesce(value, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'missing') = $1", + params: %{"1" => "missing"} + } + ) + end + + @tag with_sql: [ + "CREATE TABLE nullable_items_arity_11 (id uuid primary key, value text)", + "INSERT INTO nullable_items_arity_11 VALUES (gen_random_uuid(), null)" + ] + test "subsets return 400 for coalesce with more than 10 arguments", ctx do + req = make_shape_req("nullable_items_arity_11", log: "changes_only") + + assert {_, 400, %{"errors" => %{"subset" => %{"where" => where_error}}}} = + shape_req(req, ctx.opts, + subset: %{ + where: + "coalesce(value, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'missing') = 'missing'" + } + ) + + message = + case where_error do + msg when is_binary(msg) -> msg + msgs when is_list(msgs) -> Enum.join(msgs, " ") + end + + assert message =~ "unknown or unsupported function coalesce/11" + end + @tag with_sql: [ "INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')", "INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')" diff --git a/packages/sync-service/test/electric/replication/eval/parser_test.exs b/packages/sync-service/test/electric/replication/eval/parser_test.exs index be1029719c..12e1506851 100644 --- a/packages/sync-service/test/electric/replication/eval/parser_test.exs +++ b/packages/sync-service/test/electric/replication/eval/parser_test.exs @@ -258,6 +258,32 @@ defmodule Electric.Replication.Eval.ParserTest do assert %Func{name: "-", args: [%Ref{path: ["test"], type: :int4}]} = result end + test "should correctly parse a coalesce call with arity 10" do + assert {:ok, %Expr{eval: %Func{strict?: false, type: :text, args: args}}} = + Parser.parse_and_validate_expression( + ~S|coalesce("value", NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'fallback')|, + refs: %{["value"] => :text} + ) + + assert length(args) == 10 + assert [%Ref{path: ["value"], type: :text} | _] = args + assert %Const{value: "fallback", type: :text} = List.last(args) + end + + test "should reduce a constant coalesce call at parse time" do + assert {:ok, %Expr{eval: %Const{value: "fallback", type: :text}}} = + Parser.parse_and_validate_expression(~S|coalesce(NULL, 'fallback')|) + end + + test "should return helpful error for coalesce with more than 10 arguments" do + assert {:error, message} = + Parser.parse_and_validate_expression( + ~S|coalesce(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'fallback')| + ) + + assert message =~ "unknown or unsupported function coalesce/11" + end + test "should reduce down immutable function calls that have only constants" do env = Env.empty( diff --git a/packages/sync-service/test/electric/replication/eval/runner_test.exs b/packages/sync-service/test/electric/replication/eval/runner_test.exs index ecfecca32d..c0c881da03 100644 --- a/packages/sync-service/test/electric/replication/eval/runner_test.exs +++ b/packages/sync-service/test/electric/replication/eval/runner_test.exs @@ -52,6 +52,22 @@ defmodule Electric.Replication.Eval.RunnerTest do |> Runner.execute(%{["test"] => 1}) end + test "should evaluate coalesce with arity 10" do + expr = + ~S|coalesce("v1", NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'fallback')| + |> Parser.parse_and_validate_expression!(refs: %{["v1"] => :text}) + + assert {:ok, "fallback"} = Runner.execute(expr, %{["v1"] => nil}) + assert {:ok, "value"} = Runner.execute(expr, %{["v1"] => "value"}) + end + + test "should return nil for coalesce when all 10 arguments are NULL" do + assert {:ok, nil} = + ~S|coalesce(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)| + |> Parser.parse_and_validate_expression!() + |> Runner.execute(%{}) + end + test "should not apply strict functions to nil values" do assert {:ok, nil} = ~S|"test" + 1|