diff --git a/packages/sync-service/lib/electric/plug/metadata_snapshot_plug.ex b/packages/sync-service/lib/electric/plug/metadata_snapshot_plug.ex new file mode 100644 index 0000000000..ee544b1045 --- /dev/null +++ b/packages/sync-service/lib/electric/plug/metadata_snapshot_plug.ex @@ -0,0 +1,274 @@ +defmodule Electric.Plug.MetadataSnapshotPlug do + @moduledoc """ + Plug to create a comprehensive snapshot of the source's metadata for debugging purposes. + + Returns a JSON object containing: + + ## Global Metadata + - `database`: Current PostgreSQL snapshot information + - `xmin`: Oldest visible transaction ID + - `xmax`: Next transaction ID to be assigned + - `xip_list`: List of in-progress transaction IDs + - `lsn`: Current WAL log sequence number (as string) + - `status`: Service connection status + - `shape_count`: Total number of active shapes + + ## Per-Shape Metadata (in `shapes` array) + - `handle`: Unique shape identifier + - `definition`: Shape definition including table, where clause, columns + - `status`: Shape status (snapshot_started, snapshot_completed) + - `latest_offset`: Current log offset for this shape + - `pg_snapshot`: PostgreSQL snapshot at shape creation time (if available) + """ + + use Plug.Builder + + alias Plug.Conn + alias Electric.Connection.Manager + alias Electric.Postgres.Lsn + alias Electric.ShapeCache + alias Electric.ShapeCache.Storage + alias Electric.ShapeCache.ShapeStatus + alias Electric.StatusMonitor + alias Electric.Replication.LogOffset + + plug :fetch_query_params + plug :get_snapshot_metadata + plug :put_resp_content_type, "application/json" + plug :put_cache_headers + plug :send_response + + defp get_snapshot_metadata(%Conn{assigns: %{config: config}} = conn, _) do + stack_id = config[:stack_id] + + try do + metadata = build_metadata(stack_id) + + conn + |> assign(:metadata, metadata) + |> assign(:status_code, 200) + rescue + e -> + conn + |> assign(:error, %{message: "Failed to get metadata snapshot: #{Exception.message(e)}"}) + |> assign(:status_code, 500) + catch + :exit, {_, {DBConnection.Holder, :checkout, _}} -> + conn + |> assign(:error, %{message: "Database connection not available"}) + |> assign(:status_code, 503) + end + end + + defp build_metadata(stack_id) do + # Get global database snapshot + database_info = get_database_snapshot(stack_id) + + # Get service status + status = get_service_status(stack_id) + + # Get shape count + shape_count = get_shape_count(stack_id) + + # Get per-shape metadata + shapes = get_shapes_metadata(stack_id) + + %{ + database: database_info, + status: status, + shape_count: shape_count, + shapes: shapes + } + end + + defp get_database_snapshot(stack_id) do + pool = Manager.snapshot_pool(stack_id) + + result = + Postgrex.transaction( + pool, + fn pg_conn -> + Postgrex.query!( + pg_conn, + "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY", + [] + ) + + Postgrex.query!(pg_conn, "SELECT pg_current_snapshot(), pg_current_wal_lsn()", []) + end, + timeout: 30_000 + ) + + case result do + {:ok, %Postgrex.Result{rows: [[{xmin, xmax, xip_list}, lsn]]}} -> + %{ + xmin: xmin, + xmax: xmax, + xip_list: xip_list, + lsn: to_string(Lsn.to_integer(lsn)) + } + + {:error, _error} -> + nil + end + end + + defp get_service_status(stack_id) do + case StatusMonitor.status(stack_id) do + %{conn: conn_status, shape: shape_status} -> + %{ + connection: to_string(conn_status), + shape: to_string(shape_status) + } + + _ -> + %{connection: "unknown", shape: "unknown"} + end + end + + defp get_shape_count(stack_id) do + case ShapeCache.count_shapes(stack_id) do + count when is_integer(count) -> count + _ -> 0 + end + end + + defp get_shapes_metadata(stack_id) do + case ShapeCache.list_shapes(stack_id) do + shapes when is_list(shapes) -> + storage = Storage.for_stack(stack_id) + + Enum.map(shapes, fn {handle, shape} -> + build_shape_metadata(stack_id, handle, shape, storage) + end) + + _ -> + [] + end + end + + defp build_shape_metadata(stack_id, handle, shape, storage) do + shape_storage = Storage.for_shape(handle, storage) + + # Get shape status + snapshot_started = ShapeStatus.snapshot_started?(stack_id, handle) + snapshot_completed = ShapeStatus.snapshot_complete?(stack_id, handle) + + # Get latest offset + latest_offset = + case Storage.fetch_latest_offset(shape_storage) do + {:ok, %LogOffset{} = offset} -> LogOffset.to_iolist(offset) |> IO.iodata_to_binary() + {:ok, offset} when is_binary(offset) -> offset + _ -> nil + end + + # Get pg_snapshot for the shape + pg_snapshot = + case Storage.fetch_pg_snapshot(shape_storage) do + {:ok, %{xmin: xmin, xmax: xmax, xip_list: xip_list}} -> + %{xmin: xmin, xmax: xmax, xip_list: xip_list} + + _ -> + nil + end + + %{ + handle: handle, + definition: serialize_shape_definition(shape), + status: %{ + snapshot_started: snapshot_started, + snapshot_completed: snapshot_completed + }, + latest_offset: latest_offset, + pg_snapshot: pg_snapshot + } + end + + defp serialize_shape_definition(shape) do + {schema, table} = shape.root_table + + base = %{ + table: "#{schema}.#{table}", + root_table_id: shape.root_table_id, + primary_key: shape.root_pk, + replica: to_string(shape.replica), + log_mode: to_string(shape.log_mode) + } + + # Add selected columns if not all columns + base = + if shape.selected_columns && shape.selected_columns != [] do + Map.put(base, :columns, shape.selected_columns) + else + base + end + + # Add where clause if present + base = + if shape.where && shape.where != nil do + where_str = serialize_where_clause(shape.where) + + if where_str do + Map.put(base, :where, where_str) + else + base + end + else + base + end + + # Add flags if any are set + base = + if shape.flags && map_size(shape.flags) > 0 do + Map.put(base, :flags, shape.flags) + else + base + end + + # Add storage config + base = + if shape.storage do + Map.put(base, :storage, shape.storage) + else + base + end + + # Add dependency handles if any + base = + if shape.shape_dependencies_handles && shape.shape_dependencies_handles != [] do + Map.put(base, :dependency_handles, shape.shape_dependencies_handles) + else + base + end + + base + end + + defp serialize_where_clause(nil), do: nil + defp serialize_where_clause(%{query: query}) when is_binary(query), do: query + + defp serialize_where_clause(where) do + # Try to get a string representation of the where clause + try do + if is_struct(where) and Map.has_key?(where, :query) do + where.query + else + inspect(where) + end + rescue + _ -> nil + end + end + + defp put_cache_headers(conn, _) do + put_resp_header(conn, "cache-control", "no-cache, no-store, must-revalidate") + end + + defp send_response(%Conn{assigns: %{metadata: metadata, status_code: status_code}} = conn, _) do + send_resp(conn, status_code, Jason.encode!(metadata)) + end + + defp send_response(%Conn{assigns: %{error: error, status_code: status_code}} = conn, _) do + send_resp(conn, status_code, Jason.encode!(error)) + end +end diff --git a/packages/sync-service/lib/electric/plug/router.ex b/packages/sync-service/lib/electric/plug/router.ex index d333aeb4d7..7e616ec3c6 100644 --- a/packages/sync-service/lib/electric/plug/router.ex +++ b/packages/sync-service/lib/electric/plug/router.ex @@ -43,6 +43,8 @@ defmodule Electric.Plug.Router do get "/v1/health", to: Electric.Plug.HealthCheckPlug + get "/v1/metadata-snapshot", to: Electric.Plug.MetadataSnapshotPlug + match _, do: send_resp(conn, 404, "Not found") def server_header(conn, version), @@ -51,7 +53,8 @@ defmodule Electric.Plug.Router do # OPTIONS requests should not be authenticated def authenticate(%Plug.Conn{method: "OPTIONS"} = conn, _opts), do: conn - def authenticate(%Plug.Conn{request_path: "/v1/shape"} = conn, _opts) do + def authenticate(%Plug.Conn{request_path: path} = conn, _opts) + when path in ["/v1/shape", "/v1/metadata-snapshot"] do api_secret = conn.assigns.config[:secret] if is_nil(api_secret) do diff --git a/packages/sync-service/test/electric/plug/metadata_snapshot_plug_test.exs b/packages/sync-service/test/electric/plug/metadata_snapshot_plug_test.exs new file mode 100644 index 0000000000..e9e205b3cc --- /dev/null +++ b/packages/sync-service/test/electric/plug/metadata_snapshot_plug_test.exs @@ -0,0 +1,222 @@ +defmodule Electric.Plug.MetadataSnapshotPlugTest do + @moduledoc """ + Tests for the MetadataSnapshotPlug that returns comprehensive source and shape metadata. + """ + use ExUnit.Case, async: false + + import Support.ComponentSetup + import Support.DbSetup + import Support.DbStructureSetup + import Plug.Test + + alias Electric.Plug.Router + + @moduletag :tmp_dir + + describe "/v1/metadata-snapshot" do + setup [:with_unique_db] + + setup do + %{publication_name: "electric_test_publication", slot_name: "electric_test_slot"} + end + + setup :with_complete_stack + + setup(ctx) do + :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 1000) + %{opts: Router.init(build_router_opts(ctx))} + end + + test "GET returns comprehensive metadata snapshot", %{opts: opts} do + conn = + conn("GET", "/v1/metadata-snapshot") + |> Router.call(opts) + + assert %{status: 200} = conn + + metadata = Jason.decode!(conn.resp_body) + + # Check top-level structure + assert Map.has_key?(metadata, "database") + assert Map.has_key?(metadata, "status") + assert Map.has_key?(metadata, "shape_count") + assert Map.has_key?(metadata, "shapes") + + # Check database info + db = metadata["database"] + assert is_integer(db["xmin"]) + assert db["xmin"] > 0 + assert is_integer(db["xmax"]) + assert db["xmax"] > 0 + assert db["xmin"] < db["xmax"] + assert is_list(db["xip_list"]) + assert is_binary(db["lsn"]) + assert String.to_integer(db["lsn"]) >= 0 + + # Check status + status = metadata["status"] + assert Map.has_key?(status, "connection") + assert Map.has_key?(status, "shape") + + # Check shape_count + assert is_integer(metadata["shape_count"]) + assert metadata["shape_count"] >= 0 + + # Shapes should be a list (may be empty if no shapes created) + assert is_list(metadata["shapes"]) + end + + test "GET returns appropriate headers", %{opts: opts} do + conn = + conn("GET", "/v1/metadata-snapshot") + |> Router.call(opts) + + assert %{status: 200} = conn + + assert Plug.Conn.get_resp_header(conn, "content-type") == [ + "application/json; charset=utf-8" + ] + + assert Plug.Conn.get_resp_header(conn, "cache-control") == [ + "no-cache, no-store, must-revalidate" + ] + end + end + + describe "/v1/metadata-snapshot with shapes" do + setup [:with_unique_db, :with_basic_tables, :with_sql_execute] + + setup do + %{publication_name: "electric_test_publication", slot_name: "electric_test_slot"} + end + + setup :with_complete_stack + + setup(ctx) do + :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 1000) + %{opts: Router.init(build_router_opts(ctx))} + end + + @tag with_sql: ["INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"] + test "GET returns per-shape metadata after creating a shape", %{opts: opts} do + # First create a shape by requesting it + conn("GET", "/v1/shape?table=items&offset=-1") + |> Router.call(opts) + + # Now get the metadata snapshot + conn = + conn("GET", "/v1/metadata-snapshot") + |> Router.call(opts) + + assert %{status: 200} = conn + + metadata = Jason.decode!(conn.resp_body) + + # Should have at least one shape now + assert metadata["shape_count"] >= 1 + assert length(metadata["shapes"]) >= 1 + + # Check first shape's structure + [shape | _] = metadata["shapes"] + + assert Map.has_key?(shape, "handle") + assert is_binary(shape["handle"]) + + assert Map.has_key?(shape, "definition") + definition = shape["definition"] + assert Map.has_key?(definition, "table") + assert definition["table"] == "public.items" + assert Map.has_key?(definition, "primary_key") + assert Map.has_key?(definition, "replica") + assert Map.has_key?(definition, "log_mode") + + assert Map.has_key?(shape, "status") + status = shape["status"] + assert Map.has_key?(status, "snapshot_started") + assert Map.has_key?(status, "snapshot_completed") + + # Shape should have started snapshot after being requested + assert status["snapshot_started"] == true + + # latest_offset should exist (may be nil for very new shapes) + assert Map.has_key?(shape, "latest_offset") + + # pg_snapshot may or may not be present depending on timing + assert Map.has_key?(shape, "pg_snapshot") + end + + @tag with_sql: ["INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"] + test "GET returns shape with where clause when filtered", %{opts: opts} do + # Create a shape with a where clause + conn("GET", "/v1/shape?table=items&where=value='test value 1'&offset=-1") + |> Router.call(opts) + + # Get metadata snapshot + conn = + conn("GET", "/v1/metadata-snapshot") + |> Router.call(opts) + + assert %{status: 200} = conn + + metadata = Jason.decode!(conn.resp_body) + + # Find the shape with the where clause + filtered_shape = + Enum.find(metadata["shapes"], fn shape -> + Map.has_key?(shape["definition"], "where") + end) + + # If we found a shape with where clause, verify it + if filtered_shape do + assert filtered_shape["definition"]["where"] =~ "test value 1" + end + end + end + + describe "/v1/metadata-snapshot with authentication" do + setup [:with_unique_db] + + setup do + %{publication_name: "electric_test_publication", slot_name: "electric_test_slot"} + end + + setup :with_complete_stack + setup :secure_mode + + setup(ctx) do + :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 1000) + %{opts: Router.init(build_router_opts(ctx))} + end + + test "GET without secret returns 401", %{opts: opts} do + conn = + conn("GET", "/v1/metadata-snapshot") + |> Router.call(opts) + + assert %{status: 401} = conn + assert Jason.decode!(conn.resp_body) == %{"message" => "Unauthorized - Invalid API secret"} + end + + test "GET with valid secret returns 200", %{opts: opts, secret: secret} do + conn = + conn("GET", "/v1/metadata-snapshot?secret=#{secret}") + |> Router.call(opts) + + assert %{status: 200} = conn + + metadata = Jason.decode!(conn.resp_body) + assert Map.has_key?(metadata, "database") + assert Map.has_key?(metadata, "status") + assert Map.has_key?(metadata, "shape_count") + assert Map.has_key?(metadata, "shapes") + end + + test "GET with invalid secret returns 401", %{opts: opts} do + conn = + conn("GET", "/v1/metadata-snapshot?secret=wrong_secret") + |> Router.call(opts) + + assert %{status: 401} = conn + end + end +end