diff --git a/.changeset/chatty-planets-invite.md b/.changeset/chatty-planets-invite.md new file mode 100644 index 0000000000..54dc37ab04 --- /dev/null +++ b/.changeset/chatty-planets-invite.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/docs': patch +--- + +Document new ELECTRIC_CONSUMER_PARTITIONS environment variable diff --git a/.changeset/strong-spoons-cry.md b/.changeset/strong-spoons-cry.md new file mode 100644 index 0000000000..21c2d01b59 --- /dev/null +++ b/.changeset/strong-spoons-cry.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Improve shutdown times by changing the consumer supervision strategy diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 2b4b718102..892b55cafd 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -207,6 +207,7 @@ config :electric, max_shapes: env!("ELECTRIC_MAX_SHAPES", :integer, nil) || env!("ELECTRIC_EXPERIMENTAL_MAX_SHAPES", :integer, nil), + consumer_partitions: env!("ELECTRIC_CONSUMER_PARTITIONS", :integer, nil), max_concurrent_requests: max_concurrent_requests, # Used in telemetry instance_id: instance_id, diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index c4a885e00a..2d22006f25 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -137,6 +137,7 @@ defmodule Electric.Application do max_shapes: get_env(opts, :max_shapes), tweaks: [ publication_alter_debounce_ms: get_env(opts, :publication_alter_debounce_ms), + consumer_partitions: get_env(opts, :consumer_partitions), registry_partitions: get_env(opts, :process_registry_partitions), publication_refresh_period: get_env(opts, :publication_refresh_period), schema_reconciler_period: get_env(opts, :schema_reconciler_period), diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 5988ce3a7d..83764bb332 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -68,6 +68,7 @@ defmodule Electric.Config do stack_ready_timeout: 5_000, send_cache_headers?: true, max_shapes: nil, + consumer_partitions: nil, # This value should be tuned for the hardware it's running on. max_concurrent_requests: %{initial: 300, existing: 10_000}, ## Storage diff --git a/packages/sync-service/lib/electric/core_supervisor.ex b/packages/sync-service/lib/electric/core_supervisor.ex index 00b828af91..a80d88b9c0 100644 --- a/packages/sync-service/lib/electric/core_supervisor.ex +++ b/packages/sync-service/lib/electric/core_supervisor.ex @@ -44,8 +44,15 @@ defmodule Electric.CoreSupervisor do inspector = Keyword.fetch!(opts, :inspector) persistent_kv = Keyword.fetch!(opts, :persistent_kv) tweaks = Keyword.fetch!(opts, :tweaks) + max_shapes = Keyword.fetch!(opts, :max_shapes) - consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]} + consumer_supervisor_spec = + {Electric.Shapes.DynamicConsumerSupervisor, + [ + stack_id: stack_id, + max_shapes: max_shapes, + partitions: Keyword.get(tweaks, :consumer_partitions) + ]} shape_cache_spec = {Electric.ShapeCache, shape_cache_opts} @@ -69,8 +76,7 @@ defmodule Electric.CoreSupervisor do period: Keyword.get(tweaks, :schema_reconciler_period, 60_000)} expiry_manager_spec = - {Electric.ShapeCache.ExpiryManager, - max_shapes: Keyword.fetch!(opts, :max_shapes), stack_id: stack_id} + {Electric.ShapeCache.ExpiryManager, max_shapes: max_shapes, stack_id: stack_id} child_spec = Supervisor.child_spec( diff --git a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex index 031ee57274..b05fe3f788 100644 --- a/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex @@ -1,28 +1,51 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do @moduledoc """ - Responsible for managing shape consumer processes + Responsible for managing shape consumer processes. + + Uses a set of `DynamicSupervisor`s supervised by a parent `DynamicSupervisor` + to take advantage of the fact that `DynamicSupervisor` terminates its + children in parallel rather than one at a time. + + This improves shutdown time because all consumer processes are effectively + terminated simultaneously. """ use DynamicSupervisor require Logger - @doc """ - Returns a child spec for the PartitionSupervisor that starts a pool of - DynamicConsumerSupervisor procecesses to shard child processes across. + import Electric, only: [is_stack_id: 1] - The number of dynamic supervisors is equal to the number of CPU cores. - """ - def child_spec(opts) do - stack_id = Keyword.fetch!(opts, :stack_id) - {name, opts} = Keyword.pop(opts, :name, name(stack_id)) + defmodule PartitionDynamicSupervisor do + @moduledoc false + + use DynamicSupervisor + + def name(stack_id, partition) when is_binary(stack_id) do + Electric.ProcessRegistry.name(stack_id, __MODULE__, partition) + end - # We're overriding Electric.Shapes.DynamicConsumerSupervisor's child_spec() function here - # to make the usage of PartitionSupervisor transparent to the callers. As a consequence, we - # need to call `super()` to obtain the original DynamicSupervisor child_spec() to pass as an option to - # PartitionSupervisor. - PartitionSupervisor.child_spec(child_spec: super(opts), name: name) + def start_link({stack_id, partition}) do + DynamicSupervisor.start_link(__MODULE__, [stack_id: stack_id], + name: name(stack_id, partition) + ) + end + + @impl true + def init(stack_id: stack_id) do + Process.set_label({:consumer_supervisor_partition, stack_id}) + Logger.metadata(stack_id: stack_id) + Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + + DynamicSupervisor.init(strategy: :one_for_one) + end end + # The max number of processes to start per-partition. Found empirically to + # give a reasonable tradeoff between memory usage and shutdown speed. + @target_per_partition 4_000 + + @partition_count_key :partition_count + def name(name) when is_atom(name) do name end @@ -35,44 +58,97 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do Electric.ProcessRegistry.name(stack_id, __MODULE__) end - # This function will be invoked for each dynamic supervisor process in PartitionSupervisor's - # pool, so we keep these processes unnamed. def start_link(opts) do stack_id = Keyword.fetch!(opts, :stack_id) - DynamicSupervisor.start_link(__MODULE__, stack_id: stack_id) + {name, opts} = Keyword.pop(opts, :name, name(stack_id)) + max_processes = Keyword.get(opts, :max_shapes) || 0 + # use a fixed value for the partition count if its configured, if not then + # calculate based on the max_shapes setting (using @target_per_partition) + # or fallback to the number of schedulers + partitions = Keyword.get(opts, :partitions) || partition_count(max_processes) + + Logger.info("Starting DynamicConsumerSupervisor with #{partitions} partitions") + + with {:ok, supervisor_pid} <- + DynamicSupervisor.start_link(__MODULE__, %{stack_id: stack_id, partitions: partitions}, + name: name + ) do + case start_partition_supervisors(supervisor_pid, stack_id, partitions) do + {:ok, _pids} -> + {:ok, supervisor_pid} + + {:error, _} = error -> + DynamicSupervisor.stop(supervisor_pid, :shutdown) + error + end + end end - def start_shape_consumer(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer, config}) + defp start_partition_supervisors(supervisor_pid, stack_id, partitions) do + Electric.Utils.reduce_while_ok(0..(partitions - 1), [], fn partition, pids -> + with {:ok, pid} <- + DynamicSupervisor.start_child( + supervisor_pid, + Supervisor.child_spec( + {PartitionDynamicSupervisor, {stack_id, partition}}, + id: {:partition, partition} + ) + ) do + {:ok, [pid | pids]} + end + end) end - def start_snapshotter(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer.Snapshotter, config}) + def start_shape_consumer(stack_id, config) when is_stack_id(stack_id) do + start_child(stack_id, {Electric.Shapes.Consumer, config}) end - def start_materializer(supervisor_ref, config) do - start_child(supervisor_ref, {Electric.Shapes.Consumer.Materializer, config}) + def start_snapshotter(stack_id, config) when is_stack_id(stack_id) do + start_child(stack_id, {Electric.Shapes.Consumer.Snapshotter, config}) end - defp start_child(supervisor_ref, {child_module, child_opts} = child_spec) do - %{shape_handle: shape_handle} = child_opts + def start_materializer(stack_id, config) when is_stack_id(stack_id) do + start_child(stack_id, {Electric.Shapes.Consumer.Materializer, config}) + end - routing_key = :erlang.phash2(shape_handle) + defp start_child(stack_id, {child_module, child_opts} = child_spec) do + %{shape_handle: shape_handle} = child_opts Logger.debug(fn -> "Starting #{inspect(child_module)} for #{shape_handle}" end) - DynamicSupervisor.start_child( - {:via, PartitionSupervisor, {name(supervisor_ref), routing_key}}, - child_spec - ) + DynamicSupervisor.start_child(partition_for(stack_id, shape_handle), child_spec) end @impl true - def init(stack_id: stack_id) do + def init(%{stack_id: stack_id, partitions: partitions}) do Process.set_label({:dynamic_consumer_supervisor, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + table = :ets.new(table(stack_id), [:named_table, :public, read_concurrency: true]) + true = :ets.insert(table, [{@partition_count_key, partitions}]) + DynamicSupervisor.init(strategy: :one_for_one) end + + defp table(stack_id), do: :"Electric.Shapes.DynamicConsumerSupervisor:#{stack_id}" + + defp partition_for(stack_id, shape_handle) do + partitions = :ets.lookup_element(table(stack_id), @partition_count_key, 2) + partition = :erlang.phash2(shape_handle, partitions) + PartitionDynamicSupervisor.name(stack_id, partition) + end + + # we don't always have a value for `max_processes`, in which case just + # default to the number of schedulers + defp partition_count(0) do + System.schedulers_online() + end + + defp partition_count(max_processes) when max_processes > 0 do + max( + System.schedulers_online(), + div(max_processes + @target_per_partition - 1, @target_per_partition) + ) + end end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b01d912bb5..d7e297b7d9 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -146,7 +146,8 @@ defmodule Electric.StackSupervisor do type: {:or, [:pos_integer, nil]}, default: nil ], - process_spawn_opts: [type: :map, default: %{}] + process_spawn_opts: [type: :map, default: %{}], + consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil] ] ], lock_breaker_guard: [ diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 55dd2ef312..62c6ef7377 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -294,7 +294,7 @@ defmodule Support.ComponentSetup do defp start_consumer_supervisor(ctx) do consumer_supervisor = :"consumer_supervisor_#{full_test_name(ctx)}" - {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id]} + {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id, partitions: 1]} |> Supervisor.child_spec(id: consumer_supervisor, restart: :temporary) |> start_supervised!() diff --git a/website/docs/api/config.md b/website/docs/api/config.md index 2333c33096..74fc527d5b 100644 --- a/website/docs/api/config.md +++ b/website/docs/api/config.md @@ -411,6 +411,17 @@ This replaces the previous `ELECTRIC_EXPERIMENTAL_MAX_SHAPES` environment variab +### ELECTRIC_CONSUMER_PARTITIONS + + + +Consumer processes are partitioned across some number of supervisors to improve launch and shutdown time. If `ELECTRIC_MAX_SHAPES` is set the number of partitions will scale to fit this maximum but if not, it defaults to the number of CPU cores. If you want to improve shutdown performance without setting an upper limit on the number of shapes, then set this to roughly your expected number of shapes / 4000. + + + ## Feature Flags Feature flags enable experimental or advanced features that are not yet enabled by default in production.