From ef7899dda15c71a866ce0ebccf1cbb288aec1ad4 Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Thu, 26 Mar 2026 10:21:04 +0000 Subject: [PATCH] fix(sync-service): Improve shutdown times Some investigation shows that yes, shutting down a DynamicSupervisor with lots of children is **slow**. Calling `Process.kill(pid, :shutdown)` on a simple `DynamicSupervisor` with 200,000 child processes (`Agent`s) takes ~10 **minutes** to complete on my machine. This is despite the fact that `DynamicSupervisor`s terminate their children in parallel - sending `:EXIT` messages to all child processes without waiting for any to terminate. From the [erlang docs](https://www.erlang.org/doc/system/sup_princ.html#simplified-one_for_one-supervisors): > Because a simple_one_for_one supervisor can have many children, it > shuts them all down asynchronously. This means that the children will do > their cleanup in parallel and therefore the order in which they are > stopped is not defined. `DynamicSupervisor` inherited its shutdown behaviour from this supervisor strategy. Using a `PartitionSupervisor` helps and roughly reduces the time to shutdown by ~O(number of partitions) but this does not scale well with the number of running child processes. For instance with 200,000 children over 8 partitions the shutdown time is reduced to ~30s but if you increase the number of children to 500,000 there is a lower bound of ~7s below which you can never go, no matter how many partitions. The problem is that the `PartitionSupervisor` terminates its children sequentially. So as you increase the number of partitions, you're just increasing the number of children that are terminated sequentially, even if each child `DynamicSupervisor` terminates its children in parallel. This PR solves that by replacing the top-level `PartitionSupervisor` with another `DynamicSupervisor`. On shutdown all partition supervisors are terminated in parallel and the children of those partition supervisors are terminated in parallel. Here are some numbers from my benchmark showing the time required to shutdown a supervisor (tree) with 200,000 running processes. Our larger servers have 16 cores, so we're running a `PartitionSupervisor` with 16 partitions. ``` ========================================= Partitioned (PartitionSupervisor with 16 partitions) ========================================= 200000 processes memory: 72.7734375KiB start: 2.6s shutdown: 12.4s max queue len: 12480 ``` So 12 seconds even with a very simple process with no `terminate/2` callback. We could just increase the number of partitions... ``` ========================================= Partitioned (PartitionSupervisor with 50 partitions) ========================================= 200000 processes memory: 276.609375KiB start: 2.7s shutdown: 5.4s max queue len: 3936 ``` Which is better but we've nearly tripled the number of supervisors but only just over halved the shutdown time, so you start to see the tradeoff. Now with the new 2-tier `DynamicSupervisor`: ``` ========================================= DynamicPartitioned (DynamicSupervisor of 50 DynamicSupervisors) ========================================= 200000 processes memory: 180.84375KiB start: 2.8s shutdown: 0.5s max queue len: 3763 ``` So 10x improvement on the previous config and 25x on the current setup. The number of partitions can be set using a new env var `ELECTRIC_CONSUMER_PARTITIONS`. If that's not set then the partitions scale by `max_shapes` if that's known. If not we just use the number of cores. This is the shutdown time for our production stack with the fallback partition config, so nearly a 6x improvement. I've opted for a conservative default (could have gone with some multiple of the number of cores) but went with the lower-memory option. ``` ========================================= DynamicPartitioned (DynamicSupervisor of 16 DynamicSupervisors) ========================================= 200000 processes memory: 58.09375KiB start: 2.8s shutdown: 2.1s max queue len: 12555 ``` This also scales to 500,000 shapes, where it starts 125 partitions by default and shuts down in 1.5s (the original version took 70s): ``` 500000 processes memory: 435.21875KiB shutdown: 1.5s ``` This is the benchmarking script: ```elixir time = fn action -> {t, result} = :timer.tc(action, :microsecond) s = Float.round(t / 1_000_000.0, 1) {t, s, result} end time_colour = fn time -> if time > 1.0, do: :red, else: :green end {:ok, _} = Registry.start_link(name: ShutdownRegistry, keys: :unique) defmodule Simple do def desc(_config, _processes), do: "direct DynamicSupervisor" def start_supervisor(_config, _processes) do DynamicSupervisor.start_link(name: Simple.S, strategy: :one_for_one) end def start_child(supervisor, _i, child_spec) do DynamicSupervisor.start_child(supervisor, child_spec) end def supervisor_pids(supervisor_pid), do: [supervisor_pid] end defmodule Partitioned do @name Partitioned.S def desc(config, _processes), do: "PartitionSupervisor with #{config[:partitions]} partitions" def start_supervisor(config, _processes) do PartitionSupervisor.start_link( child_spec: DynamicSupervisor.child_spec(strategy: :one_for_one), partitions: Keyword.fetch!(config, :partitions), name: @name ) end def start_child(_supervisor, i, child_spec) do DynamicSupervisor.start_child( {:via, PartitionSupervisor, {@name, i}}, child_spec ) end def supervisor_pids(_supervisor_pid) do for {_id, pid, :supervisor, _module} <- PartitionSupervisor.which_children(@name) do pid end end end defmodule DynamicPartitioned do use DynamicSupervisor @name DynamicPartitioned.S @table __MODULE__ @target_per_partition 4_000 def name(id) do {:via, Registry, {ShutdownRegistry, {__MODULE__, id}}} end def desc(config, processes), do: "DynamicSupervisor of #{partition_count(config, processes)} DynamicSupervisors" def start_supervisor(config, processes) do # partitions = Keyword.fetch!(config, :partitions) partitions = partition_count(config, processes) with {:ok, sup} <- DynamicSupervisor.start_link( __MODULE__, [stack_id: "stack_id", partitions: partitions], name: @name ) do pids = for i <- 1..partitions do {:ok, pid} = DynamicSupervisor.start_child( sup, Supervisor.child_spec( {DynamicSupervisor, strategy: :one_for_one, name: name(i - 1)}, id: {__MODULE__, i} ) ) pid end # only needed for `supervisor_pids/1` :persistent_term.put({__MODULE__, :partitions}, List.to_tuple(pids)) # :persistent_term.put({__MODULE__, :partition_count}, partitions) {:ok, sup} end end defp partition_count(config, max_processes) do Keyword.get(config, :partitions) || calculate_partition_count(max_processes) end defp calculate_partition_count(max_processes) do cores = System.schedulers_online() max(cores, div(max_processes + @target_per_partition - 1, @target_per_partition)) end def start_child(_supervisor, i, child_spec) do DynamicSupervisor.start_child(partition(i), child_spec) end defp partition(i) do partitions = :ets.lookup_element(@table, :partition_count, 2) idx = :erlang.phash2(i, partitions) name(idx) end def supervisor_pids(_supervisor_pid) do partitions = :persistent_term.get({__MODULE__, :partitions}) Tuple.to_list(partitions) end def init(init_args) do partitions = Keyword.fetch!(init_args, :partitions) table = :ets.new(@table, [:named_table, :public, read_concurrency: true]) true = :ets.insert(table, [{:partition_count, partitions}]) DynamicSupervisor.init(strategy: :one_for_one) end end defmodule Stats do def monitor_message_queue(pids) do Stream.repeatedly(fn -> Enum.map(pids, fn pid -> case Process.info(pid, :message_queue_len) do {:message_queue_len, len} -> len nil -> 0 end end) end) |> Stream.flat_map(& &1) |> Enum.reduce_while(0, fn len, max -> receive do {:report, parent} -> send(parent, {:max_len, max}) {:halt, max} after 5 -> {:cont, if(len > max, do: len, else: max)} end end) end end impls = [ # {Simple, []}, {Partitioned, [partitions: 16]}, {Partitioned, [partitions: 50]}, {DynamicPartitioned, []}, {DynamicPartitioned, [partitions: 16]} ] p = [ # 100, # 1_000, # 10_000, # 20_000, 200_000 # 500_000 ] for {impl, config} <- impls do for n <- p do IO.puts( IO.ANSI.format([ "=========================================\n", [:cyan, " ", inspect(impl), :reset, " (", impl.desc(config, n), ")\n"], "=========================================\n" ]) ) {:ok, super} = impl.start_supervisor(config, n) pids = Enum.uniq([super | impl.supervisor_pids(super)]) supervisor_memory = pids |> Enum.map(fn pid -> {:memory, mem} = Process.info(pid, :memory) mem end) |> Enum.sum() {_start_us, start_s, _pids} = time.(fn -> Enum.map(1..n, fn i -> {:ok, pid} = impl.start_child( super, i, Supervisor.child_spec({Agent, fn -> i end}, id: {:agent, i}) ) pid end) end) Process.unlink(super) ref = Process.monitor(super) IO.write( IO.ANSI.format([ [:bright, String.pad_leading(to_string(n), 6, " "), :reset, " processes\n"], [" memory: ", to_string(supervisor_memory / 1024), "KiB\n", :reset], [" start: ", time_colour.(start_s), to_string(start_s), "s\n", :reset] ]) ) {_stop_us, stop_s, qlen} = time.(fn -> pids = impl.supervisor_pids(super) qpid = spawn(fn -> Stats.monitor_message_queue(pids) end) Process.exit(super, :shutdown) receive do {:DOWN, ^ref, :process, ^super, :shutdown} -> :ok msg -> raise msg end send(qpid, {:report, self()}) receive do {:max_len, len} -> len end end) IO.write( IO.ANSI.format([ [" shutdown: ", :bright, time_colour.(stop_s), to_string(stop_s), "s\n", :reset], ["max queue len: ", :bright, to_string(qlen), "\n", :reset], "\n" ]) ) end end ``` --- .changeset/chatty-planets-invite.md | 5 + .changeset/strong-spoons-cry.md | 5 + packages/sync-service/config/runtime.exs | 1 + .../sync-service/lib/electric/application.ex | 1 + packages/sync-service/lib/electric/config.ex | 1 + .../lib/electric/core_supervisor.ex | 12 +- .../shapes/dynamic_consumer_supervisor.ex | 138 ++++++++++++++---- .../lib/electric/stack_supervisor.ex | 3 +- .../test/support/component_setup.ex | 2 +- website/docs/api/config.md | 11 ++ 10 files changed, 143 insertions(+), 36 deletions(-) create mode 100644 .changeset/chatty-planets-invite.md create mode 100644 .changeset/strong-spoons-cry.md diff --git a/.changeset/chatty-planets-invite.md b/.changeset/chatty-planets-invite.md new file mode 100644 index 0000000000..54dc37ab04 --- /dev/null +++ b/.changeset/chatty-planets-invite.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/docs': patch +--- + +Document new ELECTRIC_CONSUMER_PARTITIONS environment variable diff --git a/.changeset/strong-spoons-cry.md b/.changeset/strong-spoons-cry.md new file mode 100644 index 0000000000..21c2d01b59 --- /dev/null +++ b/.changeset/strong-spoons-cry.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Improve shutdown times by changing the consumer supervision strategy diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 2b4b718102..892b55cafd 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -207,6 +207,7 @@ config :electric, max_shapes: env!("ELECTRIC_MAX_SHAPES", :integer, nil) || env!("ELECTRIC_EXPERIMENTAL_MAX_SHAPES", :integer, nil), + consumer_partitions: env!("ELECTRIC_CONSUMER_PARTITIONS", :integer, nil), max_concurrent_requests: max_concurrent_requests, # Used in telemetry instance_id: instance_id, diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index c4a885e00a..2d22006f25 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -137,6 +137,7 @@ defmodule Electric.Application do max_shapes: get_env(opts, :max_shapes), tweaks: [ publication_alter_debounce_ms: get_env(opts, :publication_alter_debounce_ms), + consumer_partitions: get_env(opts, :consumer_partitions), registry_partitions: get_env(opts, :process_registry_partitions), publication_refresh_period: get_env(opts, :publication_refresh_period), schema_reconciler_period: get_env(opts, :schema_reconciler_period), diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 5988ce3a7d..83764bb332 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -68,6 +68,7 @@ defmodule Electric.Config do stack_ready_timeout: 5_000, send_cache_headers?: true, max_shapes: nil, + consumer_partitions: nil, # This value should be tuned for the hardware it's running on. max_concurrent_requests: %{initial: 300, existing: 10_000}, ## Storage diff --git a/packages/sync-service/lib/electric/core_supervisor.ex b/packages/sync-service/lib/electric/core_supervisor.ex index 00b828af91..a80d88b9c0 100644 --- a/packages/sync-service/lib/electric/core_supervisor.ex +++ b/packages/sync-service/lib/electric/core_supervisor.ex @@ -44,8 +44,15 @@ defmodule Electric.CoreSupervisor do inspector = Keyword.fetch!(opts, :inspector) persistent_kv = Keyword.fetch!(opts, :persistent_kv) tweaks = Keyword.fetch!(opts, :tweaks) + max_shapes = Keyword.fetch!(opts, :max_shapes) - consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]} + consumer_supervisor_spec = + {Electric.Shapes.DynamicConsumerSupervisor, + [ + stack_id: stack_id, + max_shapes: max_shapes, + partitions: Keyword.get(tweaks, :consumer_partitions) + ]} shape_cache_spec = {Electric.ShapeCache, shape_cache_opts} @@ -69,8 +76,7 @@ defmodule Electric.CoreSupervisor do period: Keyword.get(tweaks, :schema_reconciler_period, 60_000)} expiry_manager_spec = - {Electric.ShapeCache.ExpiryManager, - max_shapes: Keyword.fetch!(opts, :max_shapes), stack_id: stack_id} + {Electric.ShapeCache.ExpiryManager, max_shapes: max_shapes, stack_id: stack_id} child_spec = Supervisor.child_spec( diff --git a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex index 031ee57274..b05fe3f788 100644 --- a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex @@ -1,28 +1,51 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do @moduledoc """ - Responsible for managing shape consumer processes + Responsible for managing shape consumer processes. + + Uses a set of `DynamicSupervisor`s supervised by a parent `DynamicSupervisor` + to take advantage of the fact that `DynamicSupervisor` terminates its + children in parallel rather than one at a time. + + This improves shutdown time because all consumer processes are effectively + terminated simultaneously. """ use DynamicSupervisor require Logger - @doc """ - Returns a child spec for the PartitionSupervisor that starts a pool of - DynamicConsumerSupervisor procecesses to shard child processes across. + import Electric, only: [is_stack_id: 1] - The number of dynamic supervisors is equal to the number of CPU cores. - """ - def child_spec(opts) do - stack_id = Keyword.fetch!(opts, :stack_id) - {name, opts} = Keyword.pop(opts, :name, name(stack_id)) + defmodule PartitionDynamicSupervisor do + @moduledoc false + + use DynamicSupervisor + + def name(stack_id, partition) when is_binary(stack_id) do + Electric.ProcessRegistry.name(stack_id, __MODULE__, partition) + end - # We're overriding Electric.Shapes.DynamicConsumerSupervisor's child_spec() function here - # to make the usage of PartitionSupervisor transparent to the callers. As a consequence, we - # need to call `super()` to obtain the original DynamicSupervisor child_spec() to pass as an option to - # PartitionSupervisor. - PartitionSupervisor.child_spec(child_spec: super(opts), name: name) + def start_link({stack_id, partition}) do + DynamicSupervisor.start_link(__MODULE__, [stack_id: stack_id], + name: name(stack_id, partition) + ) + end + + @impl true + def init(stack_id: stack_id) do + Process.set_label({:consumer_supervisor_partition, stack_id}) + Logger.metadata(stack_id: stack_id) + Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + + DynamicSupervisor.init(strategy: :one_for_one) + end end + # The max number of processes to start per-partition. Found empirically to + # give a reasonable tradeoff between memory usage and shutdown speed. + @target_per_partition 4_000 + + @partition_count_key :partition_count + def name(name) when is_atom(name) do name end @@ -35,44 +58,97 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do Electric.ProcessRegistry.name(stack_id, __MODULE__) end - # This function will be invoked for each dynamic supervisor process in PartitionSupervisor's - # pool, so we keep these processes unnamed. def start_link(opts) do stack_id = Keyword.fetch!(opts, :stack_id) - DynamicSupervisor.start_link(__MODULE__, stack_id: stack_id) + {name, opts} = Keyword.pop(opts, :name, name(stack_id)) + max_processes = Keyword.get(opts, :max_shapes) || 0 + # use a fixed value for the partition count if its configured, if not then + # calculate based on the max_shapes setting (using @target_per_partition) + # or fallback to the number of schedulers + partitions = Keyword.get(opts, :partitions) || partition_count(max_processes) + + Logger.info("Starting DynamicConsumerSupervisor with #{partitions} partitions") + + with {:ok, supervisor_pid} <- + DynamicSupervisor.start_link(__MODULE__, %{stack_id: stack_id, partitions: partitions}, + name: name + ) do + case start_partition_supervisors(supervisor_pid, stack_id, partitions) do + {:ok, _pids} -> + {:ok, supervisor_pid} + + {:error, _} = error -> + DynamicSupervisor.stop(supervisor_pid, :shutdown) + error + end + end end - def start_shape_consumer(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer, config}) + defp start_partition_supervisors(supervisor_pid, stack_id, partitions) do + Electric.Utils.reduce_while_ok(0..(partitions - 1), [], fn partition, pids -> + with {:ok, pid} <- + DynamicSupervisor.start_child( + supervisor_pid, + Supervisor.child_spec( + {PartitionDynamicSupervisor, {stack_id, partition}}, + id: {:partition, partition} + ) + ) do + {:ok, [pid | pids]} + end + end) end - def start_snapshotter(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer.Snapshotter, config}) + def start_shape_consumer(stack_id, config) when is_stack_id(stack_id) do + start_child(stack_id, {Electric.Shapes.Consumer, config}) end - def start_materializer(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer.Materializer, config}) + def start_snapshotter(stack_id, config) when is_stack_id(stack_id) do + start_child(stack_id, {Electric.Shapes.Consumer.Snapshotter, config}) end - defp start_child(supervisor_ref, {child_module, child_opts} = child_spec) do - %{shape_handle: shape_handle} = child_opts + def start_materializer(stack_id, config) when is_stack_id(stack_id) do + start_child(stack_id, {Electric.Shapes.Consumer.Materializer, config}) + end - routing_key = :erlang.phash2(shape_handle) + defp start_child(stack_id, {child_module, child_opts} = child_spec) do + %{shape_handle: shape_handle} = child_opts Logger.debug(fn -> "Starting #{inspect(child_module)} for #{shape_handle}" end) - DynamicSupervisor.start_child( - {:via, PartitionSupervisor, {name(supervisor_ref), routing_key}}, - child_spec - ) + DynamicSupervisor.start_child(partition_for(stack_id, shape_handle), child_spec) end @impl true - def init(stack_id: stack_id) do + def init(%{stack_id: stack_id, partitions: partitions}) do Process.set_label({:dynamic_consumer_supervisor, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + table = :ets.new(table(stack_id), [:named_table, :public, read_concurrency: true]) + true = :ets.insert(table, [{@partition_count_key, partitions}]) + DynamicSupervisor.init(strategy: :one_for_one) end + + defp table(stack_id), do: :"Electric.Shapes.DynamicConsumerSupervisor:#{stack_id}" + + defp partition_for(stack_id, shape_handle) do + partitions = :ets.lookup_element(table(stack_id), @partition_count_key, 2) + partition = :erlang.phash2(shape_handle, partitions) + PartitionDynamicSupervisor.name(stack_id, partition) + end + + # we don't always have a value for `max_processes`, in which case just + # default to the number of schedulers + defp partition_count(0) do + System.schedulers_online() + end + + defp partition_count(max_processes) when max_processes > 0 do + max( + System.schedulers_online(), + div(max_processes + @target_per_partition - 1, @target_per_partition) + ) + end end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b01d912bb5..d7e297b7d9 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -146,7 +146,8 @@ defmodule Electric.StackSupervisor do type: {:or, [:pos_integer, nil]}, default: nil ], - process_spawn_opts: [type: :map, default: %{}] + process_spawn_opts: [type: :map, default: %{}], + consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil] ] ], lock_breaker_guard: [ diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 55dd2ef312..62c6ef7377 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -294,7 +294,7 @@ defmodule Support.ComponentSetup do defp start_consumer_supervisor(ctx) do consumer_supervisor = :"consumer_supervisor_#{full_test_name(ctx)}" - {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id]} + {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id, partitions: 1]} |> Supervisor.child_spec(id: consumer_supervisor, restart: :temporary) |> start_supervised!() diff --git a/website/docs/api/config.md b/website/docs/api/config.md index 2333c33096..74fc527d5b 100644 --- a/website/docs/api/config.md +++ b/website/docs/api/config.md @@ -411,6 +411,17 @@ This replaces the previous `ELECTRIC_EXPERIMENTAL_MAX_SHAPES` environment variab +### ELECTRIC_CONSUMER_PARTITIONS + + + +Consumer processes are partitioned across some number of supervisors to improve launch and shutdown time. If `ELECTRIC_MAX_SHAPES` is set the number of partitions will scale to fit this maximum but if not, it defaults to the number of CPU cores. If you want to improve shutdown performance without setting an upper limit on the number of shapes, then set this to roughly your expected number of shapes / 4000. + + + ## Feature Flags Feature flags enable experimental or advanced features that are not yet enabled by default in production.