Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule Electric.Application do

def children_library do
[
# TEMPORARY DEBUG: Start the shutdown timer tracker
%{id: Electric.Debug.ShutdownTimer, start: {Electric.Debug.ShutdownTimer, :start_tracker, []}},
{Registry, name: Electric.stack_events_registry(), keys: :duplicate},
Electric.AdmissionControl
]
Expand Down Expand Up @@ -48,13 +50,17 @@ defmodule Electric.Application do

config = configuration()

Enum.concat([
children_library(),
application_telemetry(config),
[{Electric.StackSupervisor, Keyword.put(config, :name, Electric.StackSupervisor)}],
api_server_children(config),
prometheus_endpoint(Electric.Config.get_env(:prometheus_port))
])
children =
Enum.concat([
children_library(),
application_telemetry(config),
[{Electric.StackSupervisor, Keyword.put(config, :name, Electric.StackSupervisor)}],
api_server_children(config),
prometheus_endpoint(Electric.Config.get_env(:prometheus_port))
])

# TEMPORARY DEBUG: Insert sentinels between each child
Electric.Debug.ShutdownTimer.insert_sentinels(children, "Electric.Supervisor")
end

@doc """
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Electric.Config do
send_cache_headers?: true,
max_shapes: nil,
# This value should be tuned for the hardware it's running on.
max_concurrent_requests: %{initial: 300, existing: 1000},
max_concurrent_requests: %{initial: 30000, existing: 100_000},
## Storage
storage_dir: "./persistent",
storage: &Electric.Config.Defaults.storage/0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ defmodule Electric.Connection.Manager.Supervisor do
{Electric.Connection.Manager.ConnectionResolver, stack_id: opts[:stack_id]}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "Connection.Manager.Supervisor")

# Electric.Connection.Manager is a permanent child of the supervisor, so when it dies, the
# :one_for_all strategy will kick in and restart the other children.
Supervisor.init(children, strategy: :one_for_all)
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ defmodule Electric.Connection.Supervisor do
{Electric.Connection.Manager.Supervisor, opts}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "Connection.Supervisor")

Supervisor.init(children, strategy: :rest_for_one)
end
end
5 changes: 5 additions & 0 deletions packages/sync-service/lib/electric/core_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ defmodule Electric.CoreSupervisor do
{Electric.Connection.Supervisor, connection_manager_opts}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "CoreSupervisor")

Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant)
end

Expand All @@ -46,6 +49,7 @@ defmodule Electric.CoreSupervisor do
tweaks = Keyword.fetch!(opts, :tweaks)

consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]}
snapshotter_supervisor_spec = {Electric.Shapes.SnapshotterSupervisor, stack_id: stack_id}

shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}

Expand Down Expand Up @@ -78,6 +82,7 @@ defmodule Electric.CoreSupervisor do
Electric.Shapes.Supervisor,
stack_id: stack_id,
consumer_supervisor: consumer_supervisor_spec,
snapshotter_supervisor: snapshotter_supervisor_spec,
shape_cache: shape_cache_spec,
publication_manager: publication_manager_spec,
log_collector: shape_log_collector_spec,
Expand Down
175 changes: 175 additions & 0 deletions packages/sync-service/lib/electric/debug/shutdown_timer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
defmodule Electric.Debug.ShutdownTimer do
@moduledoc """
TEMPORARY DEBUG MODULE - Remove after investigating slow shutdown.

Wraps child specs to add shutdown timing logging.
Also tracks snapshotter process count.
"""

use GenServer
require Logger

@snapshotter_tracker __MODULE__.SnapshotterTracker

# --- Snapshotter tracking ---

def start_tracker do
GenServer.start_link(__MODULE__, %{}, name: @snapshotter_tracker)
end

def snapshotter_started do
GenServer.cast(@snapshotter_tracker, :started)
end

def snapshotter_stopped do
GenServer.cast(@snapshotter_tracker, :stopped)
end

@impl true
def init(_) do
{:ok, %{count: 0}}
end

@impl true
def handle_cast(:started, %{count: count} = state) do
new_count = count + 1
IO.puts("[SHUTDOWN DEBUG] Active snapshotters: #{new_count} (+1)")
{:noreply, %{state | count: new_count}}
end

def handle_cast(:stopped, %{count: count} = state) do
new_count = max(0, count - 1)
IO.puts("[SHUTDOWN DEBUG] Active snapshotters: #{new_count} (-1)")
{:noreply, %{state | count: new_count}}
end

# --- Sentinel processes for shutdown order tracking ---

@doc """
Inserts sentinel processes between each child in a list.
Each sentinel logs when it terminates, indicating the next child is about to be terminated.
Also adds a sentinel at the end to indicate termination is beginning.
"""
def insert_sentinels(children, supervisor_name) do
children_with_sentinels =
children
|> Enum.with_index()
|> Enum.flat_map(fn {child, index} ->
spec = Supervisor.child_spec(child, [])
child_id = spec.id
sentinel = sentinel_spec(supervisor_name, child_id, index)
[sentinel, child]
end)

# Add a final sentinel at the end to indicate termination is beginning
final_sentinel = sentinel_spec(supervisor_name, :termination_beginning, length(children))
children_with_sentinels ++ [final_sentinel]
end

defp sentinel_spec(supervisor_name, next_child_id, index) do
label = "#{supervisor_name} -> #{inspect(next_child_id)}"

%{
id: {__MODULE__.Sentinel, supervisor_name, index},
start: {__MODULE__.Sentinel, :start_link, [label]},
type: :worker,
shutdown: 5000
}
end

# --- Child wrapping for shutdown timing ---

@doc """
Wraps a list of child specs to log shutdown timing for each child.
"""
def wrap_children(children) do
Enum.map(children, &wrap_child/1)
end

@doc """
Wraps a single child spec to log shutdown timing.
"""
def wrap_child(child_spec) do
spec = Supervisor.child_spec(child_spec, [])
original_start = spec.start

wrapped_start = {__MODULE__, :start_wrapped, [spec.id, original_start]}

%{spec | start: wrapped_start}
end

@doc false
def start_wrapped(child_id, {mod, fun, args}) do
case apply(mod, fun, args) do
{:ok, pid} ->
spawn(fn -> monitor_for_shutdown(child_id, pid) end)
{:ok, pid}

other ->
other
end
end

defp monitor_for_shutdown(child_id, pid) do
ref = Process.monitor(pid)

receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
Logger.warning(
"[SHUTDOWN DEBUG] Process #{inspect(child_id)} (#{inspect(pid)}) terminated with reason: #{inspect(reason)}"
)
end
end

@doc """
Call this in terminate/2 callback of a GenServer to log shutdown timing.
Use this for processes where you want to measure how long terminate takes.
"""
defmacro log_terminate_start(name) do
quote do
start_time = System.monotonic_time(:millisecond)
Process.put(:shutdown_debug_start, {unquote(name), start_time})

Logger.warning(
"[SHUTDOWN DEBUG] #{unquote(name)} terminate/2 STARTED at #{inspect(self())}"
)
end
end

defmacro log_terminate_end(name) do
quote do
case Process.get(:shutdown_debug_start) do
{_, start_time} ->
elapsed = System.monotonic_time(:millisecond) - start_time

Logger.warning(
"[SHUTDOWN DEBUG] #{unquote(name)} terminate/2 FINISHED after #{elapsed}ms"
)

nil ->
Logger.warning("[SHUTDOWN DEBUG] #{unquote(name)} terminate/2 FINISHED (no start time)")
end
end
end
end

defmodule Electric.Debug.ShutdownTimer.Sentinel do
@moduledoc false
use GenServer

def start_link(label) do
GenServer.start_link(__MODULE__, label)
end

@impl true
def init(label) do
Process.flag(:trap_exit, true)
{:ok, label}
end

@impl true
def terminate(_reason, label) do
IO.puts("[SHUTDOWN DEBUG] Sentinel: about to terminate #{label}")
:ok
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ defmodule Electric.MonitoredCoreSupervisor do
{Electric.CoreSupervisor, opts}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "MonitoredCoreSupervisor")

# The :rest_for_one strategy is used here to ensure that if the StatusMonitor unexpectedly dies,
# all the processes it is monitoring are also restarted. Since the StatusMonitor keeps track of the
# statuses of the other processes, losing it means losing that state. Restarting the other children
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ defmodule Electric.Replication.PublicationManager.Supervisor do
{PublicationManager.Configurator, opts}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "PublicationManager.Supervisor")

Supervisor.init(children, strategy: :one_for_one)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ defmodule Electric.Replication.ShapeLogCollector.Supervisor do
{ShapeLogCollector.RequestBatcher, stack_id: stack_id}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "ShapeLogCollector.Supervisor")

# Prevent any restarts until the whole system is capable of sustaining
# the SLC dying without any other shape machinery being restarted
Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
Expand Down
25 changes: 19 additions & 6 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,20 @@ defmodule Electric.ShapeCache do
{handle, offset}
else
:error ->
GenServer.call(
name(stack_id),
{:create_or_wait_shape_handle, shape, opts[:otel_ctx]},
@call_timeout
)
try do
GenServer.call(
name(stack_id),
{:create_or_wait_shape_handle, shape, opts[:otel_ctx]},
@call_timeout
)
catch
:exit, {:shutdown, _} ->
{:error, "Stack is down"}

:exit, reason ->
dbg(exit: reason)
{:error, "?"}
end
end
end

Expand Down Expand Up @@ -143,6 +152,10 @@ defmodule Electric.ShapeCache do
# we'll just retry after waiting for a short time to avoid busy waiting.
Process.sleep(50)
await_snapshot_start(shape_handle, stack_id)

:exit, reason ->
dbg(exit: reason)
{:error, reason}
end
end
rescue
Expand Down Expand Up @@ -260,7 +273,7 @@ defmodule Electric.ShapeCache do

{:ok, shape_handle} = ShapeStatus.add_shape(stack_id, shape)

Logger.info("Creating new shape for #{inspect(shape)} with handle #{shape_handle}")
# Logger.info("Creating new shape for #{inspect(shape)} with handle #{shape_handle}")

{:ok, _pid} = start_shape(shape_handle, shape, state, otel_ctx, :create)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
result
rescue
e ->
:ok = execute(conn, "ROLLBACK")
execute(conn, "ROLLBACK")
reraise e, __STACKTRACE__
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do
{ShapeDb.WriteBuffer, args}
]

# TEMPORARY DEBUG: Insert sentinels between each child
children = Electric.Debug.ShutdownTimer.insert_sentinels(children, "ShapeDb.Supervisor")

Supervisor.init(children, strategy: :one_for_one)
end
end
5 changes: 5 additions & 0 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ defmodule Electric.Shapes.Api do
Shapes.resolve_shape_handle(stack_id, handle, shape)
end

defp handle_shape_info({:error, reason}, %Request{} = request) do
Logger.warning("Unable to load shape. Reason: #{reason}")
{:error, Response.error(request, reason, status: 503, retry_after: 5)}
end

defp handle_shape_info(nil, %Request{} = request) do
%{params: %{shape_definition: shape}, api: %{stack_id: stack_id}} = request
# There is no shape that matches the shape definition (because shape info is `nil`).
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ defmodule Electric.Shapes.Consumer do
# block and prevent await_snapshot_start calls from adding snapshot subscribers.

{:ok, _pid} =
Shapes.DynamicConsumerSupervisor.start_snapshotter(
Shapes.SnapshotterSupervisor.start_snapshotter(
stack_id,
%{
stack_id: stack_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
Logger.metadata(metadata)
Electric.Telemetry.Sentry.set_tags_context(metadata)

# TEMPORARY DEBUG: Track snapshotter count
Electric.Debug.ShutdownTimer.snapshotter_started()

{:ok, config, {:continue, :start_snapshot}}
end

# TEMPORARY DEBUG: Track snapshotter count on termination
def terminate(_reason, _state) do
Electric.Debug.ShutdownTimer.snapshotter_stopped()
:ok
end

def handle_continue(:start_snapshot, state) do
%{
shape_handle: shape_handle,
Expand Down
Loading
Loading