From cd6ff49eb209e0eb066271cf0bae396f22791520 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 18 May 2026 12:19:48 +0200 Subject: [PATCH 1/4] Remove outdated PartialModes.query_move_in[_async] that don't have any callers Dead code since a04b25962cdb7ca86c4434585b6f74c758e1a31b --- .../lib/electric/shapes/partial_modes.ex | 93 ------------------- 1 file changed, 93 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/partial_modes.ex b/packages/sync-service/lib/electric/shapes/partial_modes.ex index 1e1ca2f76f..a61529fc24 100644 --- a/packages/sync-service/lib/electric/shapes/partial_modes.ex +++ b/packages/sync-service/lib/electric/shapes/partial_modes.ex @@ -85,97 +85,4 @@ defmodule Electric.Shapes.PartialModes do end ) end - - @doc """ - Asynchronous version of query_move_in that doesn't block on snapshot. - Sends {:pg_snapshot_known, name, snapshot} immediately when snapshot is known. - Sends {:query_move_in_complete, name, key_set, snapshot} when query completes. - """ - def query_move_in_async(supervisor, shape_handle, %Shape{} = shape, where, opts) do - consumer_pid = Access.fetch!(opts, :consumer_pid) - pool = Manager.pool_name(opts[:stack_id], :snapshot) - results_fn = Access.fetch!(opts, :results_fn) - - :telemetry.execute([:electric, :subqueries, :move_in_triggered], %{count: 1}, %{ - stack_id: opts[:stack_id] - }) - - # Propagate OTel context so spans created inside the task are linked to the - # caller's trace. OTel context is per-process, so without this any - # `with_child_span` calls in the task would be silently dropped. - trace_context = OpenTelemetry.get_current_context() - - Task.Supervisor.start_child(supervisor, fn -> - OpenTelemetry.set_current_context(trace_context) - - try do - SnapshotQuery.execute_for_shape(pool, shape_handle, shape, - stack_id: opts[:stack_id], - query_reason: "move_in_query", - snapshot_info_fn: fn _, pg_snapshot, _ -> - # Send snapshot notification immediately instead of blocking - send(consumer_pid, {:pg_snapshot_known, opts[:move_in_name], pg_snapshot}) - end, - query_fn: fn conn, pg_snapshot, _ -> - result = - Querying.query_move_in(conn, opts[:stack_id], shape_handle, shape, where) - |> results_fn.(pg_snapshot) - - {key_set, snapshot} = result - send(consumer_pid, {:query_move_in_complete, opts[:move_in_name], key_set, snapshot}) - end - ) - rescue - error -> - send(consumer_pid, {:query_move_in_error, opts[:move_in_name], error, __STACKTRACE__}) - end - end) - - :ok - end - - def query_move_in(supervisor, shape_handle, %Shape{} = shape, where, opts) do - parent = self() - pool = Manager.pool_name(opts[:stack_id], :snapshot) - results_fn = Access.fetch!(opts, :results_fn) - - # Propagate OTel context so spans created inside the task are linked to the - # caller's trace. OTel context is per-process, so without this any - # `with_child_span` calls in the task would be silently dropped. - trace_context = OpenTelemetry.get_current_context() - - Task.Supervisor.start_child(supervisor, fn -> - OpenTelemetry.set_current_context(trace_context) - - try do - SnapshotQuery.execute_for_shape(pool, shape_handle, shape, - stack_id: opts[:stack_id], - query_reason: "move_in_query", - snapshot_info_fn: fn _, pg_snapshot, _ -> - send(parent, {:pg_snapshot_info, pg_snapshot}) - end, - query_fn: fn conn, _, _ -> - result = - Querying.query_move_in(conn, opts[:stack_id], shape_handle, shape, where) - |> results_fn.() - - send(parent, {:query_move_in_complete, opts[:move_in_name], result}) - end - ) - rescue - error -> - send(parent, {:query_move_in_error, opts[:move_in_name], error, __STACKTRACE__}) - end - end) - - receive do - {:query_move_in_error, _, error, stacktrace} -> - # {:error, error, stacktrace} - reraise(error, stacktrace) - - {:pg_snapshot_info, pg_snapshot} -> - # {:ok, pg_snapshot} - pg_snapshot - end - end end From 5f371f1b5958bf1d93e419ff62dfa70f1975eccd Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 18 May 2026 12:25:39 +0200 Subject: [PATCH 2/4] Move PartialModes.query_subset into SnapshotQuery proper --- .../lib/electric/postgres/snapshot_query.ex | 83 ++++++++++++++++- packages/sync-service/lib/electric/shapes.ex | 6 +- .../lib/electric/shapes/partial_modes.ex | 88 ------------------- 3 files changed, 85 insertions(+), 92 deletions(-) delete mode 100644 packages/sync-service/lib/electric/shapes/partial_modes.ex diff --git a/packages/sync-service/lib/electric/postgres/snapshot_query.ex b/packages/sync-service/lib/electric/postgres/snapshot_query.ex index b18a3ede59..6afe8b7e30 100644 --- a/packages/sync-service/lib/electric/postgres/snapshot_query.ex +++ b/packages/sync-service/lib/electric/postgres/snapshot_query.ex @@ -1,6 +1,7 @@ defmodule Electric.Postgres.SnapshotQuery do + alias Electric.Postgres.Lsn alias Electric.SnapshotError - alias Electric.Shapes.Shape + alias Electric.Shapes.{Querying, Shape} alias Electric.Telemetry.OpenTelemetry @type pg_snapshot() :: @@ -129,4 +130,84 @@ defmodule Electric.Postgres.SnapshotQuery do end ) end + + def execute_for_subset(shape_handle, %Shape{} = shape, subset, opts) when is_map(opts) do + stack_id = Map.fetch!(opts, :stack_id) + pool = Electric.Connection.Manager.pool_name(stack_id, :snapshot) + mark = Enum.random(0..(2 ** 31 - 1)) + headers = %{snapshot_mark: mark} + + execute_for_shape(pool, shape_handle, shape, + snapshot_info_fn: fn _, pg_snapshot, lsn -> + send(self(), {:pg_snapshot_info, pg_snapshot, lsn}) + end, + query_fn: fn conn, _, _ -> + conn + |> Querying.query_subset(stack_id, shape_handle, shape, subset, headers) + |> record_subset_metrics(stack_id, shape_handle, shape) + |> Enum.to_list() + end, + stack_id: opts[:stack_id], + query_reason: "subset_query" + ) + |> case do + {:ok, result} -> + metadata = + receive do + {:pg_snapshot_info, pg_snapshot, lsn} -> make_subset_metadata(pg_snapshot, lsn, mark) + after + 0 -> + raise "failed to execute snapshot query for shape #{shape_handle}: missing pg_snapshot_info" + end + + {:ok, {metadata, result}} + + {:error, error} -> + {:error, error} + end + rescue + e in Querying.QueryError -> + {:error, {:where, e.message}} + end + + defp make_subset_metadata({xmin, xmax, xip_list}, lsn, mark) do + %{ + xmin: xmin, + xmax: xmax, + xip_list: xip_list, + database_lsn: to_string(Lsn.to_integer(lsn)), + snapshot_mark: mark + } + end + + defp record_subset_metrics(stream, stack_id, shape_handle, shape) do + Stream.transform( + stream, + fn -> {System.monotonic_time(:microsecond), 0, 0} end, + fn row, {start_time, bytes, rows} -> + {[row], {start_time, bytes + IO.iodata_length(row), rows + 1}} + end, + fn {start_time, bytes, rows} -> + OpenTelemetry.execute( + [:electric, :subqueries, :subset_result], + %{ + duration: System.monotonic_time(:microsecond) - start_time, + bytes: bytes, + count: 1, + rows: rows + }, + %{ + stack_id: stack_id, + "shape.handle": shape_handle, + "shape.root_table": shape.root_table + } + ) + + OpenTelemetry.add_span_attributes(%{ + "subset.rows" => rows, + "subset.result_bytes" => bytes + }) + end + ) + end end diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index 7720a166c6..814e774405 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -139,7 +139,7 @@ defmodule Electric.Shapes do end end - def query_subset(handle, shape, subset, opts) do - Electric.Shapes.PartialModes.query_subset(handle, shape, subset, opts) - end + defdelegate query_subset(handle, shape, subset, opts), + to: Electric.Postgres.SnapshotQuery, + as: :execute_for_subset end diff --git a/packages/sync-service/lib/electric/shapes/partial_modes.ex b/packages/sync-service/lib/electric/shapes/partial_modes.ex deleted file mode 100644 index a61529fc24..0000000000 --- a/packages/sync-service/lib/electric/shapes/partial_modes.ex +++ /dev/null @@ -1,88 +0,0 @@ -defmodule Electric.Shapes.PartialModes do - alias Electric.Shapes.Shape - alias Electric.Postgres.Lsn - alias Electric.Shapes.Querying - alias Electric.Connection.Manager - alias Electric.Postgres.SnapshotQuery - alias Electric.Telemetry.OpenTelemetry - - def query_subset(shape_handle, %Shape{} = shape, subset, opts) when is_map(opts) do - stack_id = Map.fetch!(opts, :stack_id) - pool = Manager.pool_name(stack_id, :snapshot) - mark = Enum.random(0..(2 ** 31 - 1)) - headers = %{snapshot_mark: mark} - - SnapshotQuery.execute_for_shape(pool, shape_handle, shape, - snapshot_info_fn: fn _, pg_snapshot, lsn -> - send(self(), {:pg_snapshot_info, pg_snapshot, lsn}) - end, - query_fn: fn conn, _, _ -> - conn - |> Querying.query_subset(stack_id, shape_handle, shape, subset, headers) - |> record_subset_metrics(stack_id, shape_handle, shape) - |> Enum.to_list() - end, - stack_id: opts[:stack_id], - query_reason: "subset_query" - ) - |> case do - {:ok, result} -> - metadata = - receive do - {:pg_snapshot_info, pg_snapshot, lsn} -> make_metadata(pg_snapshot, lsn, mark) - after - 0 -> - raise "failed to execute snapshot query for shape #{shape_handle}: missing pg_snapshot_info" - end - - {:ok, {metadata, result}} - - {:error, error} -> - {:error, error} - end - rescue - e in Querying.QueryError -> - {:error, {:where, e.message}} - end - - defp make_metadata({xmin, xmax, xip_list}, lsn, mark) do - %{ - xmin: xmin, - xmax: xmax, - xip_list: xip_list, - database_lsn: to_string(Lsn.to_integer(lsn)), - snapshot_mark: mark - } - end - - defp record_subset_metrics(stream, stack_id, shape_handle, shape) do - Stream.transform( - stream, - fn -> {System.monotonic_time(:microsecond), 0, 0} end, - fn row, {start_time, bytes, rows} -> - {[row], {start_time, bytes + IO.iodata_length(row), rows + 1}} - end, - fn {start_time, bytes, rows} -> - OpenTelemetry.execute( - [:electric, :subqueries, :subset_result], - %{ - duration: System.monotonic_time(:microsecond) - start_time, - bytes: bytes, - count: 1, - rows: rows - }, - %{ - stack_id: stack_id, - "shape.handle": shape_handle, - "shape.root_table": shape.root_table - } - ) - - OpenTelemetry.add_span_attributes(%{ - "subset.rows" => rows, - "subset.result_bytes" => bytes - }) - end - ) - end -end From 3cba7ddff26edf858802ead861ed1189e01198e5 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 18 May 2026 12:26:41 +0200 Subject: [PATCH 3/4] Add changeset --- .changeset/little-pigs-complain.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/little-pigs-complain.md diff --git a/.changeset/little-pigs-complain.md b/.changeset/little-pigs-complain.md new file mode 100644 index 0000000000..40376115bb --- /dev/null +++ b/.changeset/little-pigs-complain.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Remove the single-function PartialMode module by moving its query_subset() function into SnapshotQuery, so that its implementation could sit close to SnapshotQuery.execute_for_shape(). From 221a8b7e7772151e14ca9f0aef56dd9e48d5feff Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 18 May 2026 12:34:31 +0200 Subject: [PATCH 4/4] Address review feedback: update stale comment and redundant lookup Co-Authored-By: Claude Opus 4.5 --- integration-tests/tests/otel-export.lux | 2 +- packages/sync-service/lib/electric/postgres/snapshot_query.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux index 7d7a95bda7..402f99ed6d 100644 --- a/integration-tests/tests/otel-export.lux +++ b/integration-tests/tests/otel-export.lux @@ -116,7 +116,7 @@ # The initial snapshot query span is exported from the Snapshotter path."] [invoke grep_otel_collector_output "Span shape_snapshot\.execute_for_shape .*shape\.query_reason=initial_snapshot"] - # A direct subset snapshot query span is exported from the PartialModes path. + # A direct subset snapshot query span is exported from the SnapshotQuery.execute_for_subset path. [invoke grep_otel_collector_output "Span shape_snapshot\.execute_for_shape .*shape\.query_reason=subset_query"] ### diff --git a/packages/sync-service/lib/electric/postgres/snapshot_query.ex b/packages/sync-service/lib/electric/postgres/snapshot_query.ex index 6afe8b7e30..5fe8648434 100644 --- a/packages/sync-service/lib/electric/postgres/snapshot_query.ex +++ b/packages/sync-service/lib/electric/postgres/snapshot_query.ex @@ -147,7 +147,7 @@ defmodule Electric.Postgres.SnapshotQuery do |> record_subset_metrics(stack_id, shape_handle, shape) |> Enum.to_list() end, - stack_id: opts[:stack_id], + stack_id: stack_id, query_reason: "subset_query" ) |> case do