Skip to content

Commit ef7899d

Browse files
committed
fix(sync-service): Improve shutdown times
Some investigation shows that yes, shutting down a DynamicSupervisor with lots of children is **slow**. Calling `Process.kill(pid, :shutdown)` on a simple `DynamicSupervisor` with 200,000 child processes (`Agent`s) takes ~10 **minutes** to complete on my machine. This is despite the fact that `DynamicSupervisor`s terminate their children in parallel - sending `:EXIT` messages to all child processes without waiting for any to terminate. From the [erlang docs](https://www.erlang.org/doc/system/sup_princ.html#simplified-one_for_one-supervisors): > Because a simple_one_for_one supervisor can have many children, it > shuts them all down asynchronously. This means that the children will do > their cleanup in parallel and therefore the order in which they are > stopped is not defined. `DynamicSupervisor` inherited its shutdown behaviour from this supervisor strategy. Using a `PartitionSupervisor` helps and roughly reduces the time to shutdown by ~O(number of partitions) but this does not scale well with the number of running child processes. For instance with 200,000 children over 8 partitions the shutdown time is reduced to ~30s but if you increase the number of children to 500,000 there is a lower bound of ~7s below which you can never go, no matter how many partitions. The problem is that the `PartitionSupervisor` terminates its children sequentially. So as you increase the number of partitions, you're just increasing the number of children that are terminated sequentially, even if each child `DynamicSupervisor` terminates its children in parallel. This PR solves that by replacing the top-level `PartitionSupervisor` with another `DynamicSupervisor`. On shutdown all partition supervisors are terminated in parallel and the children of those partition supervisors are terminated in parallel. Here are some numbers from my benchmark showing the time required to shutdown a supervisor (tree) with 200,000 running processes. Our larger servers have 16 cores, so we're running a `PartitionSupervisor` with 16 partitions. ``` ========================================= Partitioned (PartitionSupervisor with 16 partitions) ========================================= 200000 processes memory: 72.7734375KiB start: 2.6s shutdown: 12.4s max queue len: 12480 ``` So 12 seconds even with a very simple process with no `terminate/2` callback. We could just increase the number of partitions... ``` ========================================= Partitioned (PartitionSupervisor with 50 partitions) ========================================= 200000 processes memory: 276.609375KiB start: 2.7s shutdown: 5.4s max queue len: 3936 ``` Which is better but we've nearly tripled the number of supervisors but only just over halved the shutdown time, so you start to see the tradeoff. Now with the new 2-tier `DynamicSupervisor`: ``` ========================================= DynamicPartitioned (DynamicSupervisor of 50 DynamicSupervisors) ========================================= 200000 processes memory: 180.84375KiB start: 2.8s shutdown: 0.5s max queue len: 3763 ``` So 10x improvement on the previous config and 25x on the current setup. The number of partitions can be set using a new env var `ELECTRIC_CONSUMER_PARTITIONS`. If that's not set then the partitions scale by `max_shapes` if that's known. If not we just use the number of cores. This is the shutdown time for our production stack with the fallback partition config, so nearly a 6x improvement. I've opted for a conservative default (could have gone with some multiple of the number of cores) but went with the lower-memory option. ``` ========================================= DynamicPartitioned (DynamicSupervisor of 16 DynamicSupervisors) ========================================= 200000 processes memory: 58.09375KiB start: 2.8s shutdown: 2.1s max queue len: 12555 ``` This also scales to 500,000 shapes, where it starts 125 partitions by default and shuts down in 1.5s (the original version took 70s): ``` 500000 processes memory: 435.21875KiB shutdown: 1.5s ``` This is the benchmarking script: ```elixir time = fn action -> {t, result} = :timer.tc(action, :microsecond) s = Float.round(t / 1_000_000.0, 1) {t, s, result} end time_colour = fn time -> if time > 1.0, do: :red, else: :green end {:ok, _} = Registry.start_link(name: ShutdownRegistry, keys: :unique) defmodule Simple do def desc(_config, _processes), do: "direct DynamicSupervisor" def start_supervisor(_config, _processes) do DynamicSupervisor.start_link(name: Simple.S, strategy: :one_for_one) end def start_child(supervisor, _i, child_spec) do DynamicSupervisor.start_child(supervisor, child_spec) end def supervisor_pids(supervisor_pid), do: [supervisor_pid] end defmodule Partitioned do @name Partitioned.S def desc(config, _processes), do: "PartitionSupervisor with #{config[:partitions]} partitions" def start_supervisor(config, _processes) do PartitionSupervisor.start_link( child_spec: DynamicSupervisor.child_spec(strategy: :one_for_one), partitions: Keyword.fetch!(config, :partitions), name: @name ) end def start_child(_supervisor, i, child_spec) do DynamicSupervisor.start_child( {:via, PartitionSupervisor, {@name, i}}, child_spec ) end def supervisor_pids(_supervisor_pid) do for {_id, pid, :supervisor, _module} <- PartitionSupervisor.which_children(@name) do pid end end end defmodule DynamicPartitioned do use DynamicSupervisor @name DynamicPartitioned.S @table __MODULE__ @target_per_partition 4_000 def name(id) do {:via, Registry, {ShutdownRegistry, {__MODULE__, id}}} end def desc(config, processes), do: "DynamicSupervisor of #{partition_count(config, processes)} DynamicSupervisors" def start_supervisor(config, processes) do # partitions = Keyword.fetch!(config, :partitions) partitions = partition_count(config, processes) with {:ok, sup} <- DynamicSupervisor.start_link( __MODULE__, [stack_id: "stack_id", partitions: partitions], name: @name ) do pids = for i <- 1..partitions do {:ok, pid} = DynamicSupervisor.start_child( sup, Supervisor.child_spec( {DynamicSupervisor, strategy: :one_for_one, name: name(i - 1)}, id: {__MODULE__, i} ) ) pid end # only needed for `supervisor_pids/1` :persistent_term.put({__MODULE__, :partitions}, List.to_tuple(pids)) # :persistent_term.put({__MODULE__, :partition_count}, partitions) {:ok, sup} end end defp partition_count(config, max_processes) do Keyword.get(config, :partitions) || calculate_partition_count(max_processes) end defp calculate_partition_count(max_processes) do cores = System.schedulers_online() max(cores, div(max_processes + @target_per_partition - 1, @target_per_partition)) end def start_child(_supervisor, i, child_spec) do DynamicSupervisor.start_child(partition(i), child_spec) end defp partition(i) do partitions = :ets.lookup_element(@table, :partition_count, 2) idx = :erlang.phash2(i, partitions) name(idx) end def supervisor_pids(_supervisor_pid) do partitions = :persistent_term.get({__MODULE__, :partitions}) Tuple.to_list(partitions) end def init(init_args) do partitions = Keyword.fetch!(init_args, :partitions) table = :ets.new(@table, [:named_table, :public, read_concurrency: true]) true = :ets.insert(table, [{:partition_count, partitions}]) DynamicSupervisor.init(strategy: :one_for_one) end end defmodule Stats do def monitor_message_queue(pids) do Stream.repeatedly(fn -> Enum.map(pids, fn pid -> case Process.info(pid, :message_queue_len) do {:message_queue_len, len} -> len nil -> 0 end end) end) |> Stream.flat_map(& &1) |> Enum.reduce_while(0, fn len, max -> receive do {:report, parent} -> send(parent, {:max_len, max}) {:halt, max} after 5 -> {:cont, if(len > max, do: len, else: max)} end end) end end impls = [ # {Simple, []}, {Partitioned, [partitions: 16]}, {Partitioned, [partitions: 50]}, {DynamicPartitioned, []}, {DynamicPartitioned, [partitions: 16]} ] p = [ # 100, # 1_000, # 10_000, # 20_000, 200_000 # 500_000 ] for {impl, config} <- impls do for n <- p do IO.puts( IO.ANSI.format([ "=========================================\n", [:cyan, " ", inspect(impl), :reset, " (", impl.desc(config, n), ")\n"], "=========================================\n" ]) ) {:ok, super} = impl.start_supervisor(config, n) pids = Enum.uniq([super | impl.supervisor_pids(super)]) supervisor_memory = pids |> Enum.map(fn pid -> {:memory, mem} = Process.info(pid, :memory) mem end) |> Enum.sum() {_start_us, start_s, _pids} = time.(fn -> Enum.map(1..n, fn i -> {:ok, pid} = impl.start_child( super, i, Supervisor.child_spec({Agent, fn -> i end}, id: {:agent, i}) ) pid end) end) Process.unlink(super) ref = Process.monitor(super) IO.write( IO.ANSI.format([ [:bright, String.pad_leading(to_string(n), 6, " "), :reset, " processes\n"], [" memory: ", to_string(supervisor_memory / 1024), "KiB\n", :reset], [" start: ", time_colour.(start_s), to_string(start_s), "s\n", :reset] ]) ) {_stop_us, stop_s, qlen} = time.(fn -> pids = impl.supervisor_pids(super) qpid = spawn(fn -> Stats.monitor_message_queue(pids) end) Process.exit(super, :shutdown) receive do {:DOWN, ^ref, :process, ^super, :shutdown} -> :ok msg -> raise msg end send(qpid, {:report, self()}) receive do {:max_len, len} -> len end end) IO.write( IO.ANSI.format([ [" shutdown: ", :bright, time_colour.(stop_s), to_string(stop_s), "s\n", :reset], ["max queue len: ", :bright, to_string(qlen), "\n", :reset], "\n" ]) ) end end ```
1 parent 0b45e63 commit ef7899d

10 files changed

Lines changed: 143 additions & 36 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/docs': patch
3+
---
4+
5+
Document new ELECTRIC_CONSUMER_PARTITIONS environment variable

.changeset/strong-spoons-cry.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Improve shutdown times by changing the consumer supervision strategy

packages/sync-service/config/runtime.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ config :electric,
207207
max_shapes:
208208
env!("ELECTRIC_MAX_SHAPES", :integer, nil) ||
209209
env!("ELECTRIC_EXPERIMENTAL_MAX_SHAPES", :integer, nil),
210+
consumer_partitions: env!("ELECTRIC_CONSUMER_PARTITIONS", :integer, nil),
210211
max_concurrent_requests: max_concurrent_requests,
211212
# Used in telemetry
212213
instance_id: instance_id,

packages/sync-service/lib/electric/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ defmodule Electric.Application do
137137
max_shapes: get_env(opts, :max_shapes),
138138
tweaks: [
139139
publication_alter_debounce_ms: get_env(opts, :publication_alter_debounce_ms),
140+
consumer_partitions: get_env(opts, :consumer_partitions),
140141
registry_partitions: get_env(opts, :process_registry_partitions),
141142
publication_refresh_period: get_env(opts, :publication_refresh_period),
142143
schema_reconciler_period: get_env(opts, :schema_reconciler_period),

packages/sync-service/lib/electric/config.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ defmodule Electric.Config do
6868
stack_ready_timeout: 5_000,
6969
send_cache_headers?: true,
7070
max_shapes: nil,
71+
consumer_partitions: nil,
7172
# This value should be tuned for the hardware it's running on.
7273
max_concurrent_requests: %{initial: 300, existing: 10_000},
7374
## Storage

packages/sync-service/lib/electric/core_supervisor.ex

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,15 @@ defmodule Electric.CoreSupervisor do
4444
inspector = Keyword.fetch!(opts, :inspector)
4545
persistent_kv = Keyword.fetch!(opts, :persistent_kv)
4646
tweaks = Keyword.fetch!(opts, :tweaks)
47+
max_shapes = Keyword.fetch!(opts, :max_shapes)
4748

48-
consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]}
49+
consumer_supervisor_spec =
50+
{Electric.Shapes.DynamicConsumerSupervisor,
51+
[
52+
stack_id: stack_id,
53+
max_shapes: max_shapes,
54+
partitions: Keyword.get(tweaks, :consumer_partitions)
55+
]}
4956

5057
shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}
5158

@@ -69,8 +76,7 @@ defmodule Electric.CoreSupervisor do
6976
period: Keyword.get(tweaks, :schema_reconciler_period, 60_000)}
7077

7178
expiry_manager_spec =
72-
{Electric.ShapeCache.ExpiryManager,
73-
max_shapes: Keyword.fetch!(opts, :max_shapes), stack_id: stack_id}
79+
{Electric.ShapeCache.ExpiryManager, max_shapes: max_shapes, stack_id: stack_id}
7480

7581
child_spec =
7682
Supervisor.child_spec(
Lines changed: 107 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,51 @@
11
defmodule Electric.Shapes.DynamicConsumerSupervisor do
22
@moduledoc """
3-
Responsible for managing shape consumer processes
3+
Responsible for managing shape consumer processes.
4+
5+
Uses a set of `DynamicSupervisor`s supervised by a parent `DynamicSupervisor`
6+
to take advantage of the fact that `DynamicSupervisor` terminates its
7+
children in parallel rather than one at a time.
8+
9+
This improves shutdown time because all consumer processes are effectively
10+
terminated simultaneously.
411
"""
512
use DynamicSupervisor
613

714
require Logger
815

9-
@doc """
10-
Returns a child spec for the PartitionSupervisor that starts a pool of
11-
DynamicConsumerSupervisor procecesses to shard child processes across.
16+
import Electric, only: [is_stack_id: 1]
1217

13-
The number of dynamic supervisors is equal to the number of CPU cores.
14-
"""
15-
def child_spec(opts) do
16-
stack_id = Keyword.fetch!(opts, :stack_id)
17-
{name, opts} = Keyword.pop(opts, :name, name(stack_id))
18+
defmodule PartitionDynamicSupervisor do
19+
@moduledoc false
20+
21+
use DynamicSupervisor
22+
23+
def name(stack_id, partition) when is_binary(stack_id) do
24+
Electric.ProcessRegistry.name(stack_id, __MODULE__, partition)
25+
end
1826

19-
# We're overriding Electric.Shapes.DynamicConsumerSupervisor's child_spec() function here
20-
# to make the usage of PartitionSupervisor transparent to the callers. As a consequence, we
21-
# need to call `super()` to obtain the original DynamicSupervisor child_spec() to pass as an option to
22-
# PartitionSupervisor.
23-
PartitionSupervisor.child_spec(child_spec: super(opts), name: name)
27+
def start_link({stack_id, partition}) do
28+
DynamicSupervisor.start_link(__MODULE__, [stack_id: stack_id],
29+
name: name(stack_id, partition)
30+
)
31+
end
32+
33+
@impl true
34+
def init(stack_id: stack_id) do
35+
Process.set_label({:consumer_supervisor_partition, stack_id})
36+
Logger.metadata(stack_id: stack_id)
37+
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
38+
39+
DynamicSupervisor.init(strategy: :one_for_one)
40+
end
2441
end
2542

43+
# The max number of processes to start per-partition. Found empirically to
44+
# give a reasonable tradeoff between memory usage and shutdown speed.
45+
@target_per_partition 4_000
46+
47+
@partition_count_key :partition_count
48+
2649
def name(name) when is_atom(name) do
2750
name
2851
end
@@ -35,44 +58,97 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do
3558
Electric.ProcessRegistry.name(stack_id, __MODULE__)
3659
end
3760

38-
# This function will be invoked for each dynamic supervisor process in PartitionSupervisor's
39-
# pool, so we keep these processes unnamed.
4061
def start_link(opts) do
4162
stack_id = Keyword.fetch!(opts, :stack_id)
42-
DynamicSupervisor.start_link(__MODULE__, stack_id: stack_id)
63+
{name, opts} = Keyword.pop(opts, :name, name(stack_id))
64+
max_processes = Keyword.get(opts, :max_shapes) || 0
65+
# use a fixed value for the partition count if its configured, if not then
66+
# calculate based on the max_shapes setting (using @target_per_partition)
67+
# or fallback to the number of schedulers
68+
partitions = Keyword.get(opts, :partitions) || partition_count(max_processes)
69+
70+
Logger.info("Starting DynamicConsumerSupervisor with #{partitions} partitions")
71+
72+
with {:ok, supervisor_pid} <-
73+
DynamicSupervisor.start_link(__MODULE__, %{stack_id: stack_id, partitions: partitions},
74+
name: name
75+
) do
76+
case start_partition_supervisors(supervisor_pid, stack_id, partitions) do
77+
{:ok, _pids} ->
78+
{:ok, supervisor_pid}
79+
80+
{:error, _} = error ->
81+
DynamicSupervisor.stop(supervisor_pid, :shutdown)
82+
error
83+
end
84+
end
4385
end
4486

45-
def start_shape_consumer(supervisor_ref, config) do
46-
start_child(supervisor_ref, {Electric.Shapes.Consumer, config})
87+
defp start_partition_supervisors(supervisor_pid, stack_id, partitions) do
88+
Electric.Utils.reduce_while_ok(0..(partitions - 1), [], fn partition, pids ->
89+
with {:ok, pid} <-
90+
DynamicSupervisor.start_child(
91+
supervisor_pid,
92+
Supervisor.child_spec(
93+
{PartitionDynamicSupervisor, {stack_id, partition}},
94+
id: {:partition, partition}
95+
)
96+
) do
97+
{:ok, [pid | pids]}
98+
end
99+
end)
47100
end
48101

49-
def start_snapshotter(supervisor_ref, config) do
50-
start_child(supervisor_ref, {Electric.Shapes.Consumer.Snapshotter, config})
102+
def start_shape_consumer(stack_id, config) when is_stack_id(stack_id) do
103+
start_child(stack_id, {Electric.Shapes.Consumer, config})
51104
end
52105

53-
def start_materializer(supervisor_ref, config) do
54-
start_child(supervisor_ref, {Electric.Shapes.Consumer.Materializer, config})
106+
def start_snapshotter(stack_id, config) when is_stack_id(stack_id) do
107+
start_child(stack_id, {Electric.Shapes.Consumer.Snapshotter, config})
55108
end
56109

57-
defp start_child(supervisor_ref, {child_module, child_opts} = child_spec) do
58-
%{shape_handle: shape_handle} = child_opts
110+
def start_materializer(stack_id, config) when is_stack_id(stack_id) do
111+
start_child(stack_id, {Electric.Shapes.Consumer.Materializer, config})
112+
end
59113

60-
routing_key = :erlang.phash2(shape_handle)
114+
defp start_child(stack_id, {child_module, child_opts} = child_spec) do
115+
%{shape_handle: shape_handle} = child_opts
61116

62117
Logger.debug(fn -> "Starting #{inspect(child_module)} for #{shape_handle}" end)
63118

64-
DynamicSupervisor.start_child(
65-
{:via, PartitionSupervisor, {name(supervisor_ref), routing_key}},
66-
child_spec
67-
)
119+
DynamicSupervisor.start_child(partition_for(stack_id, shape_handle), child_spec)
68120
end
69121

70122
@impl true
71-
def init(stack_id: stack_id) do
123+
def init(%{stack_id: stack_id, partitions: partitions}) do
72124
Process.set_label({:dynamic_consumer_supervisor, stack_id})
73125
Logger.metadata(stack_id: stack_id)
74126
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
75127

128+
table = :ets.new(table(stack_id), [:named_table, :public, read_concurrency: true])
129+
true = :ets.insert(table, [{@partition_count_key, partitions}])
130+
76131
DynamicSupervisor.init(strategy: :one_for_one)
77132
end
133+
134+
defp table(stack_id), do: :"Electric.Shapes.DynamicConsumerSupervisor:#{stack_id}"
135+
136+
defp partition_for(stack_id, shape_handle) do
137+
partitions = :ets.lookup_element(table(stack_id), @partition_count_key, 2)
138+
partition = :erlang.phash2(shape_handle, partitions)
139+
PartitionDynamicSupervisor.name(stack_id, partition)
140+
end
141+
142+
# we don't always have a value for `max_processes`, in which case just
143+
# default to the number of schedulers
144+
defp partition_count(0) do
145+
System.schedulers_online()
146+
end
147+
148+
defp partition_count(max_processes) when max_processes > 0 do
149+
max(
150+
System.schedulers_online(),
151+
div(max_processes + @target_per_partition - 1, @target_per_partition)
152+
)
153+
end
78154
end

packages/sync-service/lib/electric/stack_supervisor.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ defmodule Electric.StackSupervisor do
146146
type: {:or, [:pos_integer, nil]},
147147
default: nil
148148
],
149-
process_spawn_opts: [type: :map, default: %{}]
149+
process_spawn_opts: [type: :map, default: %{}],
150+
consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil]
150151
]
151152
],
152153
lock_breaker_guard: [

packages/sync-service/test/support/component_setup.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ defmodule Support.ComponentSetup do
294294
defp start_consumer_supervisor(ctx) do
295295
consumer_supervisor = :"consumer_supervisor_#{full_test_name(ctx)}"
296296

297-
{Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id]}
297+
{Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id, partitions: 1]}
298298
|> Supervisor.child_spec(id: consumer_supervisor, restart: :temporary)
299299
|> start_supervised!()
300300

website/docs/api/config.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,17 @@ This replaces the previous `ELECTRIC_EXPERIMENTAL_MAX_SHAPES` environment variab
411411

412412
</EnvVarConfig>
413413

414+
### ELECTRIC_CONSUMER_PARTITIONS
415+
416+
<EnvVarConfig
417+
name="ELECTRIC_CONSUMER_PARTITIONS"
418+
optional="true"
419+
example="128">
420+
421+
Consumer processes are partitioned across some number of supervisors to improve launch and shutdown time. If `ELECTRIC_MAX_SHAPES` is set the number of partitions will scale to fit this maximum but if not, it defaults to the number of CPU cores. If you want to improve shutdown performance without setting an upper limit on the number of shapes, then set this to roughly your expected number of shapes / 4000.
422+
423+
</EnvVarConfig>
424+
414425
## Feature Flags
415426

416427
Feature flags enable experimental or advanced features that are not yet enabled by default in production.

0 commit comments

Comments
 (0)