Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/fix-char-padding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fixed char(n) column values being trimmed of trailing spaces in snapshot and subset queries, causing inconsistency with values from PG replication.
10 changes: 9 additions & 1 deletion packages/sync-service/lib/electric/shapes/querying.ex
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,15 @@ defmodule Electric.Shapes.Querying do
|> pg_coalesce_json_string()
end

defp pg_cast_column_to_text(column), do: ~s["#{Utils.escape_quotes(column)}"::text]
defp pg_cast_column_to_text(column) do
escaped = Utils.escape_quotes(column)
col = ~s["#{escaped}"]
# In PostgreSQL, casting bpchar (char(n)) to text strips trailing spaces.
# Use concat() for bpchar columns to preserve the space padding, since
# concat() converts its argument to text without trimming.
~s[CASE WHEN #{col} IS NULL THEN NULL::text WHEN pg_typeof(#{col}) = 'character'::regtype THEN concat(#{col}, '') ELSE #{col}::text END]
end

defp pg_escape_string_for_json(str), do: ~s[to_json(#{str})::text]
defp pg_coalesce_json_string(str), do: ~s[coalesce(#{str} , 'null')]

Expand Down
125 changes: 125 additions & 0 deletions packages/sync-service/test/electric/shapes/querying_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,134 @@ defmodule Electric.Shapes.QueryingTest do
Querying.stream_initial_data(conn, "dummy-stack-id", "dummy-shape-handle", shape)
)
end

test "preserves space padding for char(n) columns in pk-less table", %{db_conn: conn} do
Postgrex.query!(
conn,
"""
CREATE TABLE padded_no_pk (
code CHAR(6),
name TEXT
)
""",
[]
)

Postgrex.query!(
conn,
"INSERT INTO padded_no_pk VALUES ('ab', 'first'), ('cd', 'second'), (NULL, 'third')",
[]
)

shape = Shape.new!("padded_no_pk", inspector: {DirectInspector, conn})

assert [
%{
key: ~S["public"."padded_no_pk"/"ab "/"first"],
value: %{code: "ab ", name: "first"}
},
%{
key: ~S["public"."padded_no_pk"/"cd "/"second"],
value: %{code: "cd ", name: "second"}
},
%{
key: ~S["public"."padded_no_pk"/_/"third"],
value: %{code: nil, name: "third"}
}
] =
decode_stream(
Querying.stream_initial_data(conn, "dummy-stack-id", "dummy-shape-handle", shape)
)
end

test "preserves space padding for char(n) columns", %{db_conn: conn} do
Postgrex.query!(
conn,
"""
CREATE TABLE padded (
id CHAR(8) PRIMARY KEY,
name CHAR(10),
label TEXT
)
""",
[]
)

Postgrex.query!(
conn,
"INSERT INTO padded VALUES ('ab', 'hello', 'world'), ('cd', NULL, 'test')",
[]
)

shape = Shape.new!("padded", inspector: {DirectInspector, conn})

assert [
%{
key: ~S["public"."padded"/"ab "],
value: %{
id: "ab ",
name: "hello ",
label: "world"
},
headers: %{operation: "insert", relation: ["public", "padded"]}
},
%{
key: ~S["public"."padded"/"cd "],
value: %{
id: "cd ",
name: nil,
label: "test"
},
headers: %{operation: "insert", relation: ["public", "padded"]}
}
] =
decode_stream(
Querying.stream_initial_data(conn, "dummy-stack-id", "dummy-shape-handle", shape)
)
end
end

describe "query_move_in/5 with SubqueryMoves.move_in_where_clause/3" do
test "preserves space padding for char(n) join columns", %{db_conn: conn} do
for statement <- [
"CREATE TABLE parent (id CHAR(8) PRIMARY KEY, value INTEGER)",
"CREATE TABLE child (id SERIAL PRIMARY KEY, value INTEGER, parent_id CHAR(8) REFERENCES parent(id))",
"INSERT INTO parent VALUES ('ab', 1), ('cd', 2), ('ef', 3)",
"INSERT INTO child (value, parent_id) VALUES (4, 'ab'), (5, 'cd'), (6, 'ef')"
],
do: Postgrex.query!(conn, statement)

shape =
Shape.new!("child",
where: "parent_id IN (SELECT id FROM parent)",
inspector: {DirectInspector, conn}
)
|> fill_handles()

move_in_values = ["ab ", "cd "]

assert {where, params} =
SubqueryMoves.move_in_where_clause(
shape,
hd(shape.shape_dependencies_handles),
move_in_values
)

assert [
%{value: %{parent_id: "ab "}},
%{value: %{parent_id: "cd "}}
] =
Querying.query_move_in(
conn,
"dummy-stack-id",
"dummy-shape-handle",
shape,
{where, params}
)
|> Enum.map(fn [_key, _tags, json] -> json end)
|> decode_stream()
end

test "builds the correct query which executes", %{db_conn: conn} do
for statement <- [
"CREATE TABLE parent (id SERIAL PRIMARY KEY, value INTEGER)",
Expand Down
Loading