diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 7ed4262612..89302ad88f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -21,6 +21,8 @@ defmodule Electric.Application do def children_library do [ + # TEMPORARY DEBUG: Start the shutdown timer tracker + %{id: Electric.Debug.ShutdownTimer, start: {Electric.Debug.ShutdownTimer, :start_tracker, []}}, {Registry, name: Electric.stack_events_registry(), keys: :duplicate}, Electric.AdmissionControl ] @@ -48,13 +50,17 @@ defmodule Electric.Application do config = configuration() - Enum.concat([ - children_library(), - application_telemetry(config), - [{Electric.StackSupervisor, Keyword.put(config, :name, Electric.StackSupervisor)}], - api_server_children(config), - prometheus_endpoint(Electric.Config.get_env(:prometheus_port)) - ]) + children = + Enum.concat([ + children_library(), + application_telemetry(config), + [{Electric.StackSupervisor, Keyword.put(config, :name, Electric.StackSupervisor)}], + api_server_children(config), + prometheus_endpoint(Electric.Config.get_env(:prometheus_port)) + ]) + + # TEMPORARY DEBUG: Insert sentinels between each child + Electric.Debug.ShutdownTimer.insert_sentinels(children, "Electric.Supervisor") end @doc """ diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index ad6b48398e..6212f7f207 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -67,7 +67,7 @@ defmodule Electric.Config do send_cache_headers?: true, max_shapes: nil, # This value should be tuned for the hardware it's running on. - max_concurrent_requests: %{initial: 300, existing: 1000}, + max_concurrent_requests: %{initial: 30000, existing: 100_000}, ## Storage storage_dir: "./persistent", storage: &Electric.Config.Defaults.storage/0, diff --git a/packages/sync-service/lib/electric/connection/manager/supervisor.ex b/packages/sync-service/lib/electric/connection/manager/supervisor.ex index e892028c92..8bb3b8bcfb 100644 --- a/packages/sync-service/lib/electric/connection/manager/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/manager/supervisor.ex @@ -23,6 +23,9 @@ defmodule Electric.Connection.Manager.Supervisor do {Electric.Connection.Manager.ConnectionResolver, stack_id: opts[:stack_id]} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "Connection.Manager.Supervisor") + # Electric.Connection.Manager is a permanent child of the supervisor, so when it dies, the # :one_for_all strategy will kick in and restart the other children. Supervisor.init(children, strategy: :one_for_all) diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index ea32f196f4..d0f95ee874 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -63,6 +63,9 @@ defmodule Electric.Connection.Supervisor do {Electric.Connection.Manager.Supervisor, opts} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "Connection.Supervisor") + Supervisor.init(children, strategy: :rest_for_one) end end diff --git a/packages/sync-service/lib/electric/core_supervisor.ex b/packages/sync-service/lib/electric/core_supervisor.ex index 00b828af91..947692c9bf 100644 --- a/packages/sync-service/lib/electric/core_supervisor.ex +++ b/packages/sync-service/lib/electric/core_supervisor.ex @@ -30,6 +30,9 @@ defmodule Electric.CoreSupervisor do {Electric.Connection.Supervisor, connection_manager_opts} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "CoreSupervisor") + Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) end @@ -46,6 +49,7 @@ defmodule Electric.CoreSupervisor do tweaks = Keyword.fetch!(opts, :tweaks) consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]} + snapshotter_supervisor_spec = {Electric.Shapes.SnapshotterSupervisor, stack_id: stack_id} shape_cache_spec = {Electric.ShapeCache, shape_cache_opts} @@ -78,6 +82,7 @@ defmodule Electric.CoreSupervisor do Electric.Shapes.Supervisor, stack_id: stack_id, consumer_supervisor: consumer_supervisor_spec, + snapshotter_supervisor: snapshotter_supervisor_spec, shape_cache: shape_cache_spec, publication_manager: publication_manager_spec, log_collector: shape_log_collector_spec, diff --git a/packages/sync-service/lib/electric/debug/shutdown_timer.ex b/packages/sync-service/lib/electric/debug/shutdown_timer.ex new file mode 100644 index 0000000000..580a05ced7 --- /dev/null +++ b/packages/sync-service/lib/electric/debug/shutdown_timer.ex @@ -0,0 +1,175 @@ +defmodule Electric.Debug.ShutdownTimer do + @moduledoc """ + TEMPORARY DEBUG MODULE - Remove after investigating slow shutdown. + + Wraps child specs to add shutdown timing logging. + Also tracks snapshotter process count. + """ + + use GenServer + require Logger + + @snapshotter_tracker __MODULE__.SnapshotterTracker + + # --- Snapshotter tracking --- + + def start_tracker do + GenServer.start_link(__MODULE__, %{}, name: @snapshotter_tracker) + end + + def snapshotter_started do + GenServer.cast(@snapshotter_tracker, :started) + end + + def snapshotter_stopped do + GenServer.cast(@snapshotter_tracker, :stopped) + end + + @impl true + def init(_) do + {:ok, %{count: 0}} + end + + @impl true + def handle_cast(:started, %{count: count} = state) do + new_count = count + 1 + IO.puts("[SHUTDOWN DEBUG] Active snapshotters: #{new_count} (+1)") + {:noreply, %{state | count: new_count}} + end + + def handle_cast(:stopped, %{count: count} = state) do + new_count = max(0, count - 1) + IO.puts("[SHUTDOWN DEBUG] Active snapshotters: #{new_count} (-1)") + {:noreply, %{state | count: new_count}} + end + + # --- Sentinel processes for shutdown order tracking --- + + @doc """ + Inserts sentinel processes between each child in a list. + Each sentinel logs when it terminates, indicating the next child is about to be terminated. + Also adds a sentinel at the end to indicate termination is beginning. + """ + def insert_sentinels(children, supervisor_name) do + children_with_sentinels = + children + |> Enum.with_index() + |> Enum.flat_map(fn {child, index} -> + spec = Supervisor.child_spec(child, []) + child_id = spec.id + sentinel = sentinel_spec(supervisor_name, child_id, index) + [sentinel, child] + end) + + # Add a final sentinel at the end to indicate termination is beginning + final_sentinel = sentinel_spec(supervisor_name, :termination_beginning, length(children)) + children_with_sentinels ++ [final_sentinel] + end + + defp sentinel_spec(supervisor_name, next_child_id, index) do + label = "#{supervisor_name} -> #{inspect(next_child_id)}" + + %{ + id: {__MODULE__.Sentinel, supervisor_name, index}, + start: {__MODULE__.Sentinel, :start_link, [label]}, + type: :worker, + shutdown: 5000 + } + end + + # --- Child wrapping for shutdown timing --- + + @doc """ + Wraps a list of child specs to log shutdown timing for each child. + """ + def wrap_children(children) do + Enum.map(children, &wrap_child/1) + end + + @doc """ + Wraps a single child spec to log shutdown timing. + """ + def wrap_child(child_spec) do + spec = Supervisor.child_spec(child_spec, []) + original_start = spec.start + + wrapped_start = {__MODULE__, :start_wrapped, [spec.id, original_start]} + + %{spec | start: wrapped_start} + end + + @doc false + def start_wrapped(child_id, {mod, fun, args}) do + case apply(mod, fun, args) do + {:ok, pid} -> + spawn(fn -> monitor_for_shutdown(child_id, pid) end) + {:ok, pid} + + other -> + other + end + end + + defp monitor_for_shutdown(child_id, pid) do + ref = Process.monitor(pid) + + receive do + {:DOWN, ^ref, :process, ^pid, reason} -> + Logger.warning( + "[SHUTDOWN DEBUG] Process #{inspect(child_id)} (#{inspect(pid)}) terminated with reason: #{inspect(reason)}" + ) + end + end + + @doc """ + Call this in terminate/2 callback of a GenServer to log shutdown timing. + Use this for processes where you want to measure how long terminate takes. + """ + defmacro log_terminate_start(name) do + quote do + start_time = System.monotonic_time(:millisecond) + Process.put(:shutdown_debug_start, {unquote(name), start_time}) + + Logger.warning( + "[SHUTDOWN DEBUG] #{unquote(name)} terminate/2 STARTED at #{inspect(self())}" + ) + end + end + + defmacro log_terminate_end(name) do + quote do + case Process.get(:shutdown_debug_start) do + {_, start_time} -> + elapsed = System.monotonic_time(:millisecond) - start_time + + Logger.warning( + "[SHUTDOWN DEBUG] #{unquote(name)} terminate/2 FINISHED after #{elapsed}ms" + ) + + nil -> + Logger.warning("[SHUTDOWN DEBUG] #{unquote(name)} terminate/2 FINISHED (no start time)") + end + end + end +end + +defmodule Electric.Debug.ShutdownTimer.Sentinel do + @moduledoc false + use GenServer + + def start_link(label) do + GenServer.start_link(__MODULE__, label) + end + + @impl true + def init(label) do + Process.flag(:trap_exit, true) + {:ok, label} + end + + @impl true + def terminate(_reason, label) do + IO.puts("[SHUTDOWN DEBUG] Sentinel: about to terminate #{label}") + :ok + end +end diff --git a/packages/sync-service/lib/electric/monitored_core_supervisor.ex b/packages/sync-service/lib/electric/monitored_core_supervisor.ex index 21591098e3..081c30a697 100644 --- a/packages/sync-service/lib/electric/monitored_core_supervisor.ex +++ b/packages/sync-service/lib/electric/monitored_core_supervisor.ex @@ -30,6 +30,9 @@ defmodule Electric.MonitoredCoreSupervisor do {Electric.CoreSupervisor, opts} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "MonitoredCoreSupervisor") + # The :rest_for_one strategy is used here to ensure that if the StatusMonitor unexpectedly dies, # all the processes it is monitoring are also restarted. Since the StatusMonitor keeps track of the # statuses of the other processes, losing it means losing that state. Restarting the other children diff --git a/packages/sync-service/lib/electric/replication/publication_manager/supervisor.ex b/packages/sync-service/lib/electric/replication/publication_manager/supervisor.ex index 2cfc80b45d..2e398486a9 100644 --- a/packages/sync-service/lib/electric/replication/publication_manager/supervisor.ex +++ b/packages/sync-service/lib/electric/replication/publication_manager/supervisor.ex @@ -62,6 +62,9 @@ defmodule Electric.Replication.PublicationManager.Supervisor do {PublicationManager.Configurator, opts} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "PublicationManager.Supervisor") + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector/supervisor.ex b/packages/sync-service/lib/electric/replication/shape_log_collector/supervisor.ex index 886b418761..61e53d0992 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector/supervisor.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector/supervisor.ex @@ -30,6 +30,9 @@ defmodule Electric.Replication.ShapeLogCollector.Supervisor do {ShapeLogCollector.RequestBatcher, stack_id: stack_id} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "ShapeLogCollector.Supervisor") + # Prevent any restarts until the whole system is capable of sustaining # the SLC dying without any other shape machinery being restarted Supervisor.init(children, strategy: :one_for_all, max_restarts: 0) diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 95afae5f24..6034a00df1 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -66,11 +66,20 @@ defmodule Electric.ShapeCache do {handle, offset} else :error -> - GenServer.call( - name(stack_id), - {:create_or_wait_shape_handle, shape, opts[:otel_ctx]}, - @call_timeout - ) + try do + GenServer.call( + name(stack_id), + {:create_or_wait_shape_handle, shape, opts[:otel_ctx]}, + @call_timeout + ) + catch + :exit, {:shutdown, _} -> + {:error, "Stack is down"} + + :exit, reason -> + dbg(exit: reason) + {:error, "?"} + end end end @@ -143,6 +152,10 @@ defmodule Electric.ShapeCache do # we'll just retry after waiting for a short time to avoid busy waiting. Process.sleep(50) await_snapshot_start(shape_handle, stack_id) + + :exit, reason -> + dbg(exit: reason) + {:error, reason} end end rescue @@ -260,7 +273,7 @@ defmodule Electric.ShapeCache do {:ok, shape_handle} = ShapeStatus.add_shape(stack_id, shape) - Logger.info("Creating new shape for #{inspect(shape)} with handle #{shape_handle}") + # Logger.info("Creating new shape for #{inspect(shape)} with handle #{shape_handle}") {:ok, _pid} = start_shape(shape_handle, shape, state, otel_ctx, :create) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index 540ef8b299..a769eec005 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -291,7 +291,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do result rescue e -> - :ok = execute(conn, "ROLLBACK") + execute(conn, "ROLLBACK") reraise e, __STACKTRACE__ end end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex index 706b4b5c89..b472831118 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex @@ -48,6 +48,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do {ShapeDb.WriteBuffer, args} ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "ShapeDb.Supervisor") + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/packages/sync-service/lib/electric/shapes/api.ex b/packages/sync-service/lib/electric/shapes/api.ex index 421b47ba0f..97a62a61c9 100644 --- a/packages/sync-service/lib/electric/shapes/api.ex +++ b/packages/sync-service/lib/electric/shapes/api.ex @@ -285,6 +285,11 @@ defmodule Electric.Shapes.Api do Shapes.resolve_shape_handle(stack_id, handle, shape) end + defp handle_shape_info({:error, reason}, %Request{} = request) do + Logger.warning("Unable to load shape. Reason: #{reason}") + {:error, Response.error(request, reason, status: 503, retry_after: 5)} + end + defp handle_shape_info(nil, %Request{} = request) do %{params: %{shape_definition: shape}, api: %{stack_id: stack_id}} = request # There is no shape that matches the shape definition (because shape info is `nil`). diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 8bc0109adf..ee6ccccb27 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -145,7 +145,7 @@ defmodule Electric.Shapes.Consumer do # block and prevent await_snapshot_start calls from adding snapshot subscribers. {:ok, _pid} = - Shapes.DynamicConsumerSupervisor.start_snapshotter( + Shapes.SnapshotterSupervisor.start_snapshotter( stack_id, %{ stack_id: stack_id, diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 515444f16e..f3c45d83b7 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -31,9 +31,18 @@ defmodule Electric.Shapes.Consumer.Snapshotter do Logger.metadata(metadata) Electric.Telemetry.Sentry.set_tags_context(metadata) + # TEMPORARY DEBUG: Track snapshotter count + Electric.Debug.ShutdownTimer.snapshotter_started() + {:ok, config, {:continue, :start_snapshot}} end + # TEMPORARY DEBUG: Track snapshotter count on termination + def terminate(_reason, _state) do + Electric.Debug.ShutdownTimer.snapshotter_stopped() + :ok + end + def handle_continue(:start_snapshot, state) do %{ shape_handle: shape_handle, diff --git a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex index 031ee57274..2766c690d9 100644 --- a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex @@ -46,10 +46,6 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do start_child(supervisor_ref, {Electric.Shapes.Consumer, config}) end - def start_snapshotter(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer.Snapshotter, config}) - end - def start_materializer(supervisor_ref, config) do start_child(supervisor_ref, {Electric.Shapes.Consumer.Materializer, config}) end diff --git a/packages/sync-service/lib/electric/shapes/snapshotter_supervisor.ex b/packages/sync-service/lib/electric/shapes/snapshotter_supervisor.ex new file mode 100644 index 0000000000..8c92e40b45 --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/snapshotter_supervisor.ex @@ -0,0 +1,61 @@ +defmodule Electric.Shapes.SnapshotterSupervisor do + @moduledoc """ + DynamicSupervisor for snapshotter processes. + + This is separate from the DynamicConsumerSupervisor so that snapshotters + can be shut down with :brutal_kill for faster shutdown times. + """ + use DynamicSupervisor + + import Electric, only: [is_stack_id: 1] + + require Logger + + def child_spec(opts) do + stack_id = Keyword.fetch!(opts, :stack_id) + + {name, opts} = Keyword.pop(opts, :name, name(stack_id)) + + # We're overriding Electric.Shapes.SnapshotterSupervisor's child_spec() function here + # to make the usage of PartitionSupervisor transparent to the callers. As a consequence, we + # need to call `super()` to obtain the original DynamicSupervisor child_spec() to pass as an option to + # PartitionSupervisor. + PartitionSupervisor.child_spec( + child_spec: Supervisor.child_spec(super(opts), shutdown: :brutal_kill), + name: name + ) + end + + def name(stack_id) do + Electric.ProcessRegistry.name(stack_id, __MODULE__) + end + + # This function will be invoked for each dynamic supervisor process in PartitionSupervisor's + # pool, so we keep these processes unnamed. + def start_link(opts) do + stack_id = Keyword.fetch!(opts, :stack_id) + DynamicSupervisor.start_link(__MODULE__, stack_id: stack_id) + end + + def start_snapshotter(stack_id, config) when is_stack_id(stack_id) do + %{shape_handle: shape_handle} = config + + routing_key = :erlang.phash2(shape_handle) + Logger.debug(fn -> "Starting Snapshotter for #{config.shape_handle}" end) + + DynamicSupervisor.start_child( + {:via, PartitionSupervisor, {name(stack_id), routing_key}}, + {Electric.Shapes.Consumer.Snapshotter, config} + ) + end + + @impl true + def init(opts) do + stack_id = Keyword.fetch!(opts, :stack_id) + Process.set_label({:snapshotter_supervisor, stack_id}) + Logger.metadata(stack_id: stack_id) + Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + + DynamicSupervisor.init(strategy: :one_for_one) + end +end diff --git a/packages/sync-service/lib/electric/shapes/supervisor.ex b/packages/sync-service/lib/electric/shapes/supervisor.ex index 17a10b2208..5dbe17f327 100644 --- a/packages/sync-service/lib/electric/shapes/supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/supervisor.ex @@ -42,6 +42,7 @@ defmodule Electric.Shapes.Supervisor do log_collector = Keyword.fetch!(opts, :log_collector) publication_manager = Keyword.fetch!(opts, :publication_manager) consumer_supervisor = Keyword.fetch!(opts, :consumer_supervisor) + snapshotter_supervisor = Keyword.fetch!(opts, :snapshotter_supervisor) shape_cache = Keyword.fetch!(opts, :shape_cache) expiry_manager = Keyword.fetch!(opts, :expiry_manager) schema_reconciler = Keyword.fetch!(opts, :schema_reconciler) @@ -51,6 +52,7 @@ defmodule Electric.Shapes.Supervisor do name: Electric.ProcessRegistry.name(stack_id, Electric.StackTaskSupervisor)}, log_collector, publication_manager, + snapshotter_supervisor, consumer_supervisor, shape_cache, expiry_manager, @@ -58,6 +60,9 @@ defmodule Electric.Shapes.Supervisor do canary_spec(stack_id) ] + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "Shapes.Supervisor") + Supervisor.init(children, strategy: :one_for_all) end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index ad2494dc45..0747e4bf8d 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -379,10 +379,21 @@ defmodule Electric.StackSupervisor do {Electric.MonitoredCoreSupervisor, stack_id: stack_id, connection_manager_opts: connection_manager_opts, - storage_dir: config.storage_dir} + storage_dir: config.storage_dir}, + {Agent, + fn -> + Electric.ShapeCache.ShapeStatus.reduce_shapes(stack_id, 0, fn {handle, _shape}, n -> + {:ok, _pid} = Electric.ShapeCache.start_consumer_for_handle(handle, stack_id) + n + 1 + end) + |> tap(fn n -> IO.puts("started #{n} consumers") end) + end} ] |> Enum.reject(&is_nil/1) + # TEMPORARY DEBUG: Insert sentinels between each child + children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "StackSupervisor") + Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant) end end