Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/little-pigs-complain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Remove the single-function PartialMode module by moving its query_subset() function into SnapshotQuery, so that its implementation could sit close to SnapshotQuery.execute_for_shape().
2 changes: 1 addition & 1 deletion integration-tests/tests/otel-export.lux
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
# The initial snapshot query span is exported from the Snapshotter path."]
[invoke grep_otel_collector_output "Span shape_snapshot\.execute_for_shape .*shape\.query_reason=initial_snapshot"]

# A direct subset snapshot query span is exported from the PartialModes path.
# A direct subset snapshot query span is exported from the SnapshotQuery.execute_for_subset path.
[invoke grep_otel_collector_output "Span shape_snapshot\.execute_for_shape .*shape\.query_reason=subset_query"]

###
Expand Down
83 changes: 82 additions & 1 deletion packages/sync-service/lib/electric/postgres/snapshot_query.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Electric.Postgres.SnapshotQuery do
alias Electric.Postgres.Lsn
alias Electric.SnapshotError
alias Electric.Shapes.Shape
alias Electric.Shapes.{Querying, Shape}
alias Electric.Telemetry.OpenTelemetry

@type pg_snapshot() ::
Expand Down Expand Up @@ -129,4 +130,84 @@ defmodule Electric.Postgres.SnapshotQuery do
end
)
end

def execute_for_subset(shape_handle, %Shape{} = shape, subset, opts) when is_map(opts) do
stack_id = Map.fetch!(opts, :stack_id)
pool = Electric.Connection.Manager.pool_name(stack_id, :snapshot)
mark = Enum.random(0..(2 ** 31 - 1))
headers = %{snapshot_mark: mark}

execute_for_shape(pool, shape_handle, shape,
snapshot_info_fn: fn _, pg_snapshot, lsn ->
send(self(), {:pg_snapshot_info, pg_snapshot, lsn})
end,
query_fn: fn conn, _, _ ->
conn
|> Querying.query_subset(stack_id, shape_handle, shape, subset, headers)
|> record_subset_metrics(stack_id, shape_handle, shape)
|> Enum.to_list()
end,
stack_id: stack_id,
query_reason: "subset_query"
)
|> case do
{:ok, result} ->
metadata =
receive do
{:pg_snapshot_info, pg_snapshot, lsn} -> make_subset_metadata(pg_snapshot, lsn, mark)
after
0 ->
raise "failed to execute snapshot query for shape #{shape_handle}: missing pg_snapshot_info"
end

{:ok, {metadata, result}}

{:error, error} ->
{:error, error}
end
rescue
e in Querying.QueryError ->
{:error, {:where, e.message}}
end

defp make_subset_metadata({xmin, xmax, xip_list}, lsn, mark) do
%{
xmin: xmin,
xmax: xmax,
xip_list: xip_list,
database_lsn: to_string(Lsn.to_integer(lsn)),
snapshot_mark: mark
}
end

defp record_subset_metrics(stream, stack_id, shape_handle, shape) do
Stream.transform(
stream,
fn -> {System.monotonic_time(:microsecond), 0, 0} end,
fn row, {start_time, bytes, rows} ->
{[row], {start_time, bytes + IO.iodata_length(row), rows + 1}}
end,
fn {start_time, bytes, rows} ->
OpenTelemetry.execute(
[:electric, :subqueries, :subset_result],
%{
duration: System.monotonic_time(:microsecond) - start_time,
bytes: bytes,
count: 1,
rows: rows
},
%{
stack_id: stack_id,
"shape.handle": shape_handle,
"shape.root_table": shape.root_table
}
)

OpenTelemetry.add_span_attributes(%{
"subset.rows" => rows,
"subset.result_bytes" => bytes
})
end
)
end
end
6 changes: 3 additions & 3 deletions packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ defmodule Electric.Shapes do
end
end

def query_subset(handle, shape, subset, opts) do
Electric.Shapes.PartialModes.query_subset(handle, shape, subset, opts)
end
defdelegate query_subset(handle, shape, subset, opts),
to: Electric.Postgres.SnapshotQuery,
as: :execute_for_subset
end
181 changes: 0 additions & 181 deletions packages/sync-service/lib/electric/shapes/partial_modes.ex

This file was deleted.

Loading