Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chatty-planets-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/docs': patch
---

Document new ELECTRIC_CONSUMER_PARTITIONS environment variable
5 changes: 5 additions & 0 deletions .changeset/strong-spoons-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Improve shutdown times by changing the consumer supervision strategy
1 change: 1 addition & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ config :electric,
max_shapes:
env!("ELECTRIC_MAX_SHAPES", :integer, nil) ||
env!("ELECTRIC_EXPERIMENTAL_MAX_SHAPES", :integer, nil),
consumer_partitions: env!("ELECTRIC_CONSUMER_PARTITIONS", :integer, nil),
max_concurrent_requests: max_concurrent_requests,
# Used in telemetry
instance_id: instance_id,
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ defmodule Electric.Application do
max_shapes: get_env(opts, :max_shapes),
tweaks: [
publication_alter_debounce_ms: get_env(opts, :publication_alter_debounce_ms),
consumer_partitions: get_env(opts, :consumer_partitions),
registry_partitions: get_env(opts, :process_registry_partitions),
publication_refresh_period: get_env(opts, :publication_refresh_period),
schema_reconciler_period: get_env(opts, :schema_reconciler_period),
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ defmodule Electric.Config do
stack_ready_timeout: 5_000,
send_cache_headers?: true,
max_shapes: nil,
consumer_partitions: nil,
# This value should be tuned for the hardware it's running on.
max_concurrent_requests: %{initial: 300, existing: 10_000},
## Storage
Expand Down
12 changes: 9 additions & 3 deletions packages/sync-service/lib/electric/core_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,15 @@ defmodule Electric.CoreSupervisor do
inspector = Keyword.fetch!(opts, :inspector)
persistent_kv = Keyword.fetch!(opts, :persistent_kv)
tweaks = Keyword.fetch!(opts, :tweaks)
max_shapes = Keyword.fetch!(opts, :max_shapes)

consumer_supervisor_spec = {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]}
consumer_supervisor_spec =
{Electric.Shapes.DynamicConsumerSupervisor,
[
stack_id: stack_id,
max_shapes: max_shapes,
partitions: Keyword.get(tweaks, :consumer_partitions)
]}

shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}

Expand All @@ -69,8 +76,7 @@ defmodule Electric.CoreSupervisor do
period: Keyword.get(tweaks, :schema_reconciler_period, 60_000)}

expiry_manager_spec =
{Electric.ShapeCache.ExpiryManager,
max_shapes: Keyword.fetch!(opts, :max_shapes), stack_id: stack_id}
{Electric.ShapeCache.ExpiryManager, max_shapes: max_shapes, stack_id: stack_id}

child_spec =
Supervisor.child_spec(
Expand Down
138 changes: 107 additions & 31 deletions packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,28 +1,51 @@
defmodule Electric.Shapes.DynamicConsumerSupervisor do
@moduledoc """
Responsible for managing shape consumer processes
Responsible for managing shape consumer processes.

Uses a set of `DynamicSupervisor`s supervised by a parent `DynamicSupervisor`
to take advantage of the fact that `DynamicSupervisor` terminates its
children in parallel rather than one at a time.

This improves shutdown time because all consumer processes are effectively
terminated simultaneously.
"""
use DynamicSupervisor

require Logger

@doc """
Returns a child spec for the PartitionSupervisor that starts a pool of
DynamicConsumerSupervisor procecesses to shard child processes across.
import Electric, only: [is_stack_id: 1]

The number of dynamic supervisors is equal to the number of CPU cores.
"""
def child_spec(opts) do
stack_id = Keyword.fetch!(opts, :stack_id)
{name, opts} = Keyword.pop(opts, :name, name(stack_id))
defmodule PartitionDynamicSupervisor do
@moduledoc false

use DynamicSupervisor

def name(stack_id, partition) when is_binary(stack_id) do
Electric.ProcessRegistry.name(stack_id, __MODULE__, partition)
end

# We're overriding Electric.Shapes.DynamicConsumerSupervisor's child_spec() function here
# to make the usage of PartitionSupervisor transparent to the callers. As a consequence, we
# need to call `super()` to obtain the original DynamicSupervisor child_spec() to pass as an option to
# PartitionSupervisor.
PartitionSupervisor.child_spec(child_spec: super(opts), name: name)
def start_link({stack_id, partition}) do
DynamicSupervisor.start_link(__MODULE__, [stack_id: stack_id],
name: name(stack_id, partition)
)
end

@impl true
def init(stack_id: stack_id) do
Process.set_label({:consumer_supervisor_partition, stack_id})
Logger.metadata(stack_id: stack_id)
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)

DynamicSupervisor.init(strategy: :one_for_one)
end
end

# The max number of processes to start per-partition. Found empirically to
# give a reasonable tradeoff between memory usage and shutdown speed.
@target_per_partition 4_000

@partition_count_key :partition_count

def name(name) when is_atom(name) do
name
end
Expand All @@ -35,44 +58,97 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do
Electric.ProcessRegistry.name(stack_id, __MODULE__)
end

# This function will be invoked for each dynamic supervisor process in PartitionSupervisor's
# pool, so we keep these processes unnamed.
def start_link(opts) do
stack_id = Keyword.fetch!(opts, :stack_id)
DynamicSupervisor.start_link(__MODULE__, stack_id: stack_id)
{name, opts} = Keyword.pop(opts, :name, name(stack_id))
max_processes = Keyword.get(opts, :max_shapes) || 0
# use a fixed value for the partition count if its configured, if not then
# calculate based on the max_shapes setting (using @target_per_partition)
# or fallback to the number of schedulers
partitions = Keyword.get(opts, :partitions) || partition_count(max_processes)

Logger.info("Starting DynamicConsumerSupervisor with #{partitions} partitions")

with {:ok, supervisor_pid} <-
DynamicSupervisor.start_link(__MODULE__, %{stack_id: stack_id, partitions: partitions},
name: name
) do
case start_partition_supervisors(supervisor_pid, stack_id, partitions) do
{:ok, _pids} ->
{:ok, supervisor_pid}

{:error, _} = error ->
DynamicSupervisor.stop(supervisor_pid, :shutdown)
error
end
end
end

def start_shape_consumer(supervisor_ref, config) do
start_child(supervisor_ref, {Electric.Shapes.Consumer, config})
defp start_partition_supervisors(supervisor_pid, stack_id, partitions) do
Electric.Utils.reduce_while_ok(0..(partitions - 1), [], fn partition, pids ->
with {:ok, pid} <-
DynamicSupervisor.start_child(
supervisor_pid,
Supervisor.child_spec(
{PartitionDynamicSupervisor, {stack_id, partition}},
id: {:partition, partition}
)
) do
{:ok, [pid | pids]}
end
end)
end

def start_snapshotter(supervisor_ref, config) do
start_child(supervisor_ref, {Electric.Shapes.Consumer.Snapshotter, config})
def start_shape_consumer(stack_id, config) when is_stack_id(stack_id) do
start_child(stack_id, {Electric.Shapes.Consumer, config})
end

def start_materializer(supervisor_ref, config) do
start_child(supervisor_ref, {Electric.Shapes.Consumer.Materializer, config})
def start_snapshotter(stack_id, config) when is_stack_id(stack_id) do
start_child(stack_id, {Electric.Shapes.Consumer.Snapshotter, config})
end

defp start_child(supervisor_ref, {child_module, child_opts} = child_spec) do
%{shape_handle: shape_handle} = child_opts
def start_materializer(stack_id, config) when is_stack_id(stack_id) do
start_child(stack_id, {Electric.Shapes.Consumer.Materializer, config})
end

routing_key = :erlang.phash2(shape_handle)
defp start_child(stack_id, {child_module, child_opts} = child_spec) do
%{shape_handle: shape_handle} = child_opts

Logger.debug(fn -> "Starting #{inspect(child_module)} for #{shape_handle}" end)

DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {name(supervisor_ref), routing_key}},
child_spec
)
DynamicSupervisor.start_child(partition_for(stack_id, shape_handle), child_spec)
end

@impl true
def init(stack_id: stack_id) do
def init(%{stack_id: stack_id, partitions: partitions}) do
Process.set_label({:dynamic_consumer_supervisor, stack_id})
Logger.metadata(stack_id: stack_id)
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)

table = :ets.new(table(stack_id), [:named_table, :public, read_concurrency: true])
true = :ets.insert(table, [{@partition_count_key, partitions}])

DynamicSupervisor.init(strategy: :one_for_one)
end

defp table(stack_id), do: :"Electric.Shapes.DynamicConsumerSupervisor:#{stack_id}"

defp partition_for(stack_id, shape_handle) do
partitions = :ets.lookup_element(table(stack_id), @partition_count_key, 2)
partition = :erlang.phash2(shape_handle, partitions)
PartitionDynamicSupervisor.name(stack_id, partition)
end

# we don't always have a value for `max_processes`, in which case just
# default to the number of schedulers
defp partition_count(0) do
System.schedulers_online()
end

defp partition_count(max_processes) when max_processes > 0 do
max(
System.schedulers_online(),
div(max_processes + @target_per_partition - 1, @target_per_partition)
)
end
end
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ defmodule Electric.StackSupervisor do
type: {:or, [:pos_integer, nil]},
default: nil
],
process_spawn_opts: [type: :map, default: %{}]
process_spawn_opts: [type: :map, default: %{}],
consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil]
]
],
lock_breaker_guard: [
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ defmodule Support.ComponentSetup do
defp start_consumer_supervisor(ctx) do
consumer_supervisor = :"consumer_supervisor_#{full_test_name(ctx)}"

{Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id]}
{Electric.Shapes.DynamicConsumerSupervisor, [stack_id: ctx.stack_id, partitions: 1]}
|> Supervisor.child_spec(id: consumer_supervisor, restart: :temporary)
|> start_supervised!()

Expand Down
11 changes: 11 additions & 0 deletions website/docs/api/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ This replaces the previous `ELECTRIC_EXPERIMENTAL_MAX_SHAPES` environment variab

</EnvVarConfig>

### ELECTRIC_CONSUMER_PARTITIONS

<EnvVarConfig
name="ELECTRIC_CONSUMER_PARTITIONS"
optional="true"
example="128">

Consumer processes are partitioned across some number of supervisors to improve launch and shutdown time. If `ELECTRIC_MAX_SHAPES` is set the number of partitions will scale to fit this maximum but if not, it defaults to the number of CPU cores. If you want to improve shutdown performance without setting an upper limit on the number of shapes, then set this to roughly your expected number of shapes / 4000.

</EnvVarConfig>

## Feature Flags

Feature flags enable experimental or advanced features that are not yet enabled by default in production.
Expand Down
Loading