diff --git a/.changeset/hibernate-then-suspend.md b/.changeset/hibernate-then-suspend.md new file mode 100644 index 0000000000..6b18ca0957 --- /dev/null +++ b/.changeset/hibernate-then-suspend.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': minor +--- + +Add hibernate-then-suspend behavior for Consumer processes. When suspend is enabled, consumers now hibernate first (triggering GC) before suspending. Adds `shape_suspend_after` config (default 60s) to control the delay between hibernation and suspension. Any activity cancels the pending suspend timer, restarting the cycle. diff --git a/integration-tests/tests/shape-suspension-resumption.lux b/integration-tests/tests/shape-suspension-resumption.lux index 198917c27c..e3035151ba 100644 --- a/integration-tests/tests/shape-suspension-resumption.lux +++ b/integration-tests/tests/shape-suspension-resumption.lux @@ -20,7 +20,7 @@ # Start Electric and wait for it to finish initialization. # Set a very small hibernation timeout so consumers will shutdown quickly -[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"] +[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"] [shell electric] [timeout 10] ??[debug] Replication client started streaming diff --git a/packages/sync-service/2026-05-06-hibernate-then-suspend.md b/packages/sync-service/2026-05-06-hibernate-then-suspend.md new file mode 100644 index 0000000000..372e6e42f2 --- /dev/null +++ b/packages/sync-service/2026-05-06-hibernate-then-suspend.md @@ -0,0 +1,837 @@ +# Hibernate-Then-Suspend Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** When consumer suspend is enabled, hibernate first (shorter timeout) to trigger GC, then suspend later (longer timeout) to terminate the process. + +**Architecture:** Introduce a two-stage timeout: after `hibernate_after` ms of inactivity, the consumer hibernates and schedules a `:suspend_timeout` message for `suspend_after` ms later. Any activity cancels the pending suspend timer. This ensures GC runs (via hibernation) before eventual process termination (suspend). + +**Tech Stack:** Elixir GenServer, `:erlang.send_after/3` for timer scheduling + +--- + +## File Structure + +| File | Action | Responsibility | +| ------------------------------------------ | ------ | ----------------------------------------------------------------- | +| `lib/electric/config.ex` | Modify | Add `shape_suspend_after` default (60s) | +| `lib/electric/stack_config.ex` | Modify | Add `shape_suspend_after` to seed config | +| `lib/electric/stack_supervisor.ex` | Modify | Add `shape_suspend_after` to schema and config | +| `lib/electric/application.ex` | Modify | Pass `shape_suspend_after` through | +| `lib/electric/shapes/consumer/state.ex` | Modify | Add `:suspend_timer` and `:suspend_after` fields | +| `lib/electric/shapes/consumer.ex` | Modify | Implement hibernate-then-suspend flow | +| `lib/electric/shapes/consumer_registry.ex` | Modify | Update `enable_suspend/3` → `enable_suspend/4` with suspend_after | +| `test/electric/shapes/consumer_test.exs` | Modify | Add tests for hibernate-then-suspend behavior | + +--- + +## Task 1: Add `shape_suspend_after` Configuration + +**Files:** + +- Modify: `packages/sync-service/lib/electric/config.ex:88-91` +- Modify: `packages/sync-service/lib/electric/stack_config.ex:31-32` +- Modify: `packages/sync-service/lib/electric/stack_supervisor.ex:131-137` +- Modify: `packages/sync-service/lib/electric/application.ex:146-147` + +- [ ] **Step 1: Add default to config.ex** + +In `lib/electric/config.ex`, add `shape_suspend_after` after `shape_enable_suspend?`: + +```elixir + shape_hibernate_after: :timer.seconds(30), + # Should we terminate consumer processes after `shape_hibernate_after` ms + # or just hibernate them? + shape_enable_suspend?: false, + # How long after hibernation before suspending (terminating) the consumer + shape_suspend_after: :timer.seconds(60), +``` + +- [ ] **Step 2: Add to stack_config.ex seed config** + +In `lib/electric/stack_config.ex`, function `default_seed_config/0`: + +```elixir + shape_hibernate_after: Electric.Config.default(:shape_hibernate_after), + shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), + shape_suspend_after: Electric.Config.default(:shape_suspend_after), +``` + +- [ ] **Step 3: Add to stack_supervisor.ex schema** + +In `lib/electric/stack_supervisor.ex`, in the `:tweaks` schema around line 135: + +```elixir + shape_enable_suspend?: [ + type: :boolean, + default: Electric.Config.default(:shape_enable_suspend?) + ], + shape_suspend_after: [ + type: :non_neg_integer, + default: Electric.Config.default(:shape_suspend_after) + ], +``` + +- [ ] **Step 4: Update stack_supervisor.ex config extraction** + +In `lib/electric/stack_supervisor.ex`, around line 353-354, add extraction: + +```elixir + shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) + shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) + shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after) +``` + +- [ ] **Step 5: Update stack_supervisor.ex config passing** + +In `lib/electric/stack_supervisor.ex`, around line 402-403, add to the config map: + +```elixir + shape_hibernate_after: shape_hibernate_after, + shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, +``` + +- [ ] **Step 6: Update application.ex** + +In `lib/electric/application.ex`, around line 146-147: + +```elixir + shape_hibernate_after: get_env(opts, :shape_hibernate_after), + shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), + shape_suspend_after: get_env(opts, :shape_suspend_after), +``` + +- [ ] **Step 7: Verify compilation** + +Run: `cd packages/sync-service && mix compile --warnings-as-errors` +Expected: Compilation succeeds without warnings + +- [ ] **Step 8: Commit** + +```bash +git add packages/sync-service/lib/electric/config.ex \ + packages/sync-service/lib/electric/stack_config.ex \ + packages/sync-service/lib/electric/stack_supervisor.ex \ + packages/sync-service/lib/electric/application.ex +git commit -m "$(cat <<'EOF' +feat(sync-service): add shape_suspend_after configuration + +Add new config option to control the delay between hibernation and +suspension. Default is 60 seconds. This prepares for hibernate-then-suspend +behavior where consumers first hibernate (to trigger GC) before suspending. + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` + +--- + +## Task 2: Add State Fields for Suspend Timer + +**Files:** + +- Modify: `packages/sync-service/lib/electric/shapes/consumer/state.ex:16-45` +- Modify: `packages/sync-service/lib/electric/shapes/consumer/state.ex:88-100` + +- [ ] **Step 1: Add fields to defstruct** + +In `lib/electric/shapes/consumer/state.ex`, add to the defstruct (around line 44): + +```elixir + # Timer reference for scheduled suspend, set when entering hibernation + suspend_timer: nil, + # How long after hibernation to suspend (in ms) + suspend_after: nil +``` + +- [ ] **Step 2: Initialize suspend_after in new/2** + +In `lib/electric/shapes/consumer/state.ex`, update `new/2` function to initialize `suspend_after`: + +```elixir + def new(stack_id, shape_handle) do + %__MODULE__{ + stack_id: stack_id, + shape_handle: shape_handle, + hibernate_after: + Electric.StackConfig.lookup( + stack_id, + :shape_hibernate_after, + Electric.Config.default(:shape_hibernate_after) + ), + suspend_after: + Electric.StackConfig.lookup( + stack_id, + :shape_suspend_after, + Electric.Config.default(:shape_suspend_after) + ), + buffering?: true + } + end +``` + +- [ ] **Step 3: Verify compilation** + +Run: `cd packages/sync-service && mix compile --warnings-as-errors` +Expected: Compilation succeeds without warnings + +- [ ] **Step 4: Commit** + +```bash +git add packages/sync-service/lib/electric/shapes/consumer/state.ex +git commit -m "$(cat <<'EOF' +feat(sync-service): add suspend_timer and suspend_after to Consumer.State + +Add state fields to track the scheduled suspend timer and the configured +delay between hibernation and suspension. + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` + +--- + +## Task 3: Implement Hibernate-Then-Suspend Logic in Consumer + +**Files:** + +- Modify: `packages/sync-service/lib/electric/shapes/consumer.ex:389-423` + +- [ ] **Step 1: Add helper to cancel suspend timer** + +Add a new private function after `consumer_can_suspend?/1` (around line 423): + +```elixir + defp cancel_suspend_timer(%{suspend_timer: nil} = state), do: state + + defp cancel_suspend_timer(%{suspend_timer: timer_ref} = state) do + :erlang.cancel_timer(timer_ref) + receive do + :suspend_timeout -> :ok + after + 0 -> :ok + end + %{state | suspend_timer: nil} + end +``` + +- [ ] **Step 2: Add helper to schedule suspend timer** + +Add after the cancel helper: + +```elixir + defp schedule_suspend_timer(%{suspend_after: suspend_after} = state) do + timer_ref = :erlang.send_after(suspend_after, self(), :suspend_timeout) + %{state | suspend_timer: timer_ref} + end +``` + +- [ ] **Step 3: Modify handle_info(:timeout, ...) for hibernate-then-suspend** + +Replace the existing `handle_info(:timeout, state)` function (lines 397-414): + +```elixir + def handle_info(:timeout, state) do + state = cancel_suspend_timer(state) + + if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do + state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} + state = schedule_suspend_timer(state) + {:noreply, state, :hibernate} + else + state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} + {:noreply, state, :hibernate} + end + end +``` + +- [ ] **Step 4: Add handle_info(:suspend_timeout, ...) handler** + +Add a new handler after the `:timeout` handler: + +```elixir + def handle_info(:suspend_timeout, state) do + state = %{state | suspend_timer: nil} + + if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do + Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end) + {:stop, ShapeCleaner.consumer_suspend_reason(), state} + else + {:noreply, state, state.hibernate_after} + end + end +``` + +- [ ] **Step 5: Cancel suspend timer on activity** + +Find all places that return `{:noreply, state, state.hibernate_after}` and ensure they cancel the suspend timer. Create a helper that wraps the common pattern. Add this helper near the cancel_suspend_timer function: + +```elixir + defp reply_with_timeout(state) do + state = cancel_suspend_timer(state) + {state, state.hibernate_after} + end +``` + +Update handle_continue(:consume_buffer, ...) at line 167-175: + +```elixir + def handle_continue(:consume_buffer, state) do + state = process_buffered_txn_fragments(state) + + if state.terminating? do + {:noreply, state, {:continue, :stop_and_clean}} + else + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end + end +``` + +- [ ] **Step 6: Update remaining handlers to cancel timer on activity** + +Update each handler that returns `state.hibernate_after` to use the pattern `{state, timeout} = reply_with_timeout(state)`: + +For `handle_call({:monitor, pid}, ...)` at line 178: + +```elixir + def handle_call({:monitor, pid}, _from, %{monitors: monitors} = state) do + ref = make_ref() + {state, timeout} = reply_with_timeout(%{state | monitors: [{pid, ref} | monitors]}) + {:reply, ref, state, timeout} + end +``` + +For `handle_call(:await_snapshot_start, ...)` at line 183: + +```elixir + def handle_call(:await_snapshot_start, _from, state) when is_snapshot_started(state) do + {state, timeout} = reply_with_timeout(state) + {:reply, :started, state, timeout} + end +``` + +For `handle_call(:await_snapshot_start, from, state)` at line 187: + +```elixir + def handle_call(:await_snapshot_start, from, state) do + Logger.debug("Starting a wait on the snapshot #{state.shape_handle} for #{inspect(from)}}") + state = State.add_waiter(state, from) + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end +``` + +For `handle_call({:handle_event, ...})` at line 193: + +```elixir + def handle_call({:handle_event, event, trace_context}, _from, state) do + OpenTelemetry.set_current_context(trace_context) + + case handle_event(event, state) do + %{terminating?: true} = state -> + {:reply, :ok, state, {:continue, :stop_and_clean}} + + state -> + {state, timeout} = reply_with_timeout(state) + {:reply, :ok, state, timeout} + end + end +``` + +For `handle_call({:subscribe_materializer, ...})` at line 205: + +```elixir + def handle_call({:subscribe_materializer, pid}, _from, state) do + Logger.debug("Subscribing materializer for #{state.shape_handle}") + Process.monitor(pid, tag: :materializer_down) + state = %{state | materializer_subscribed?: true} + {state, timeout} = reply_with_timeout(state) + {:reply, {:ok, state.latest_offset}, state, timeout} + end +``` + +For `handle_cast({:snapshot_started, ...})` at line 230: + +```elixir + def handle_cast({:snapshot_started, shape_handle}, %{shape_handle: shape_handle} = state) do + Logger.debug("Snapshot started shape_handle: #{shape_handle}") + state = State.mark_snapshot_started(state) + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end +``` + +For `handle_cast({:snapshot_exists, ...})` at line 251: + +```elixir + def handle_cast({:snapshot_exists, shape_handle}, %{shape_handle: shape_handle} = state) do + state = State.mark_snapshot_started(state) + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end +``` + +For `handle_info({ShapeCache.Storage, :flushed, ...})` at line 277: + +```elixir + def handle_info({ShapeCache.Storage, :flushed, flushed_offset}, state) do + state = + if is_write_unit_txn(state.write_unit) or is_nil(state.pending_txn) do + confirm_flushed_and_notify(state, flushed_offset) + else + updated_offset = more_recent_offset(state.pending_flush_offset, flushed_offset) + %{state | pending_flush_offset: updated_offset} + end + + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end +``` + +For `handle_info({:global_last_seen_lsn, ...})` at line 294: + +```elixir + def handle_info({:global_last_seen_lsn, _lsn} = event, state) do + case handle_event(event, state) do + %{terminating?: true} = state -> + {:noreply, state, {:continue, :stop_and_clean}} + + state -> + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end + end +``` + +For `handle_info({ShapeCache.Storage, message}, state)` at line 305: + +```elixir + def handle_info({ShapeCache.Storage, message}, state) do + writer = ShapeCache.Storage.apply_message(state.writer, message) + state = %{state | writer: writer} + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end +``` + +For `handle_apply_event_result/2` at line 847: + +```elixir + defp handle_apply_event_result(_old_state, {state, notification, _num_changes, _total_size}) do + if notification do + :ok = notify_new_changes(state, notification) + end + + {state, timeout} = reply_with_timeout(state) + {:noreply, state, timeout} + end +``` + +- [ ] **Step 7: Update handle_info({:configure_suspend, ...})** + +Update the configure_suspend handler at line 392: + +```elixir + def handle_info({:configure_suspend, hibernate_after, suspend_after, jitter_period}, state) do + state = cancel_suspend_timer(state) + state = %{state | hibernate_after: hibernate_after, suspend_after: suspend_after} + {:noreply, state, Enum.random(hibernate_after..jitter_period)} + end +``` + +- [ ] **Step 8: Verify compilation** + +Run: `cd packages/sync-service && mix compile --warnings-as-errors` +Expected: Compilation succeeds without warnings + +- [ ] **Step 9: Commit** + +```bash +git add packages/sync-service/lib/electric/shapes/consumer.ex +git commit -m "$(cat <<'EOF' +feat(sync-service): implement hibernate-then-suspend in Consumer + +When suspend is enabled, consumers now: +1. Hibernate first on timeout (triggering GC) +2. Schedule a suspend timer for suspend_after ms later +3. Suspend (terminate) when the suspend timer fires + +Any activity cancels the pending suspend timer, restarting the cycle. +This ensures GC runs before eventual process termination. + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` + +--- + +## Task 4: Update ConsumerRegistry.enable_suspend + +**Files:** + +- Modify: `packages/sync-service/lib/electric/shapes/consumer_registry.ex:213-253` + +- [ ] **Step 1: Update enable_suspend function signature and implementation** + +Replace the `enable_suspend/3` function: + +```elixir + @doc """ + Dynamically (re-)enable consumer suspension on all running consumers. + + This allows for dynamically re-configuring consumer suspension even if it was + disabled, because the configuration message will have the side-effect of + waking all consumers from hibernation. + + The `jitter_period` value allows for spreading the suspension of existing + consumers over a large time period to avoid a sudden rush of consumer + shutdowns after `hibernate_after` ms. + + To re-enable consumer suspend: + + # set the hibernation timeout to 1 minute, suspend timeout to 5 minutes, + # and phase the suspension of existing consumers over a 20 minute period + Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 300_000, 60_000 * 20) + + Disabling suspension is as easy as: + + Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false) + + """ + @spec enable_suspend(stack_id(), pos_integer(), pos_integer(), pos_integer()) :: + consumer_count :: non_neg_integer() + def enable_suspend(stack_id, hibernate_after, suspend_after, jitter_period) + when is_integer(hibernate_after) and is_integer(suspend_after) and + is_integer(jitter_period) and jitter_period > hibernate_after do + Electric.StackConfig.put(stack_id, :shape_hibernate_after, hibernate_after) + Electric.StackConfig.put(stack_id, :shape_suspend_after, suspend_after) + Electric.StackConfig.put(stack_id, :shape_enable_suspend?, true) + + :ets.foldl( + fn {_shape_handle, pid}, n -> + if Process.alive?(pid), + do: send(pid, {:configure_suspend, hibernate_after, suspend_after, jitter_period}) + + n + 1 + end, + 0, + ets_name(stack_id) + ) + end +``` + +- [ ] **Step 2: Verify compilation** + +Run: `cd packages/sync-service && mix compile --warnings-as-errors` +Expected: Compilation succeeds without warnings + +- [ ] **Step 3: Commit** + +```bash +git add packages/sync-service/lib/electric/shapes/consumer_registry.ex +git commit -m "$(cat <<'EOF' +feat(sync-service): update enable_suspend to include suspend_after + +Change enable_suspend/3 to enable_suspend/4, adding the suspend_after +parameter to configure the delay between hibernation and suspension. + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` + +--- + +## Task 5: Add Tests for Hibernate-Then-Suspend + +**Files:** + +- Modify: `packages/sync-service/test/electric/shapes/consumer_test.exs` + +- [ ] **Step 1: Add test for hibernate-then-suspend flow** + +Add a new test in the "transactions" describe block, after the existing suspend tests (around line 1504): + +```elixir + @tag hibernate_after: 10, suspend_after: 50, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "should hibernate first then suspend after suspend_after ms", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate_after (10ms) + small buffer + Process.sleep(30) + + # Should be hibernated, not suspended yet + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Wait for suspend_after (50ms from hibernate) to complete + Process.sleep(80) + + # Now should be suspended + assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}} + + refute Consumer.whereis(ctx.stack_id, shape_handle) + end +``` + +- [ ] **Step 2: Add test that activity cancels suspend timer** + +Add another test: + +```elixir + @tag hibernate_after: 10, suspend_after: 100, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "activity during hibernation cancels pending suspend", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + lsn2 = Lsn.from_integer(301) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn1 = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate + Process.sleep(30) + + # Should be hibernated + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + # Send another transaction - this should cancel the suspend timer + txn2 = + complete_txn_fragment(3, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "22"}, + log_offset: LogOffset.new(lsn2, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + assert_receive {:flush_boundary_updated, 301}, 1_000 + + # Wait past original suspend_after window + Process.sleep(150) + + # Should NOT have suspended because activity reset the timer + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Process should still be alive (hibernated again) + assert Process.alive?(consumer_pid) + end +``` + +- [ ] **Step 3: Update the setup to handle suspend_after tag** + +In the setup block around line 594, add handling for `suspend_after`: + +```elixir + setup(ctx) do + Electric.StackConfig.put( + ctx.stack_id, + :shape_hibernate_after, + Map.get(ctx, :hibernate_after, 10_000) + ) + + Electric.StackConfig.put( + ctx.stack_id, + :shape_suspend_after, + Map.get(ctx, :suspend_after, 60_000) + ) + + if not Map.get(ctx, :allow_subqueries, true) do + Electric.StackConfig.put(ctx.stack_id, :feature_flags, []) + end + + :ok + end +``` + +- [ ] **Step 4: Update the enable_suspend test** + +Update the existing `ConsumerRegistry.enable_suspend` test at line 1466: + +```elixir + @tag with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: false + test "ConsumerRegistry.enable_suspend should suspend hibernated consumers", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert_receive {:flush_boundary_updated, 300}, 1_000 + + Process.sleep(60) + + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}} + + assert Consumer.whereis(ctx.stack_id, shape_handle) + + # hibernate_after=5, suspend_after=5, jitter_period=10 + Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 5, 10) + + Process.sleep(60) + + assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}} + + refute Consumer.whereis(ctx.stack_id, shape_handle) + end +``` + +- [ ] **Step 5: Run the tests** + +Run: `cd packages/sync-service && mix test test/electric/shapes/consumer_test.exs --seed 0 --only suspend` +Expected: All suspend-related tests pass + +- [ ] **Step 6: Run full consumer test suite** + +Run: `cd packages/sync-service && mix test test/electric/shapes/consumer_test.exs` +Expected: All tests pass + +- [ ] **Step 7: Commit** + +```bash +git add packages/sync-service/test/electric/shapes/consumer_test.exs +git commit -m "$(cat <<'EOF' +test(sync-service): add tests for hibernate-then-suspend behavior + +Add tests verifying: +- Consumer hibernates first, then suspends after suspend_after ms +- Activity during hibernation cancels the pending suspend timer +- Update enable_suspend test for new 4-arity function + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` + +--- + +## Task 6: Update Documentation Comment in Config + +**Files:** + +- Modify: `packages/sync-service/lib/electric/config.ex` + +- [ ] **Step 1: Update the config comment** + +Update the comment at lines 88-91: + +```elixir + # After this duration of inactivity, consumer processes will hibernate + # to allow garbage collection + shape_hibernate_after: :timer.seconds(30), + # If enabled, terminate (suspend) consumer processes after hibernating. + # This frees memory more aggressively than hibernation alone. + shape_enable_suspend?: false, + # After hibernating, wait this duration before suspending (terminating). + # Only applies when shape_enable_suspend? is true. + shape_suspend_after: :timer.seconds(60), +``` + +- [ ] **Step 2: Commit** + +```bash +git add packages/sync-service/lib/electric/config.ex +git commit -m "$(cat <<'EOF' +docs(sync-service): clarify hibernate/suspend config comments + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` + +--- + +## Task 7: Final Verification + +- [ ] **Step 1: Run full test suite** + +Run: `cd packages/sync-service && mix test` +Expected: All tests pass + +- [ ] **Step 2: Run formatter** + +Run: `cd packages/sync-service && mix format` +Expected: No changes (code already formatted) or auto-formats + +- [ ] **Step 3: Run dialyzer (if available)** + +Run: `cd packages/sync-service && mix dialyzer` +Expected: No errors + +- [ ] **Step 4: Final commit if any formatting changes** + +```bash +git add -A +git diff --cached --quiet || git commit -m "$(cat <<'EOF' +chore(sync-service): format code + +Co-Authored-By: Claude Opus 4.5 +EOF +)" +``` diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 955218d8e0..89be072e01 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -164,6 +164,9 @@ shape_hibernate_after = shape_enable_suspend? = env!("ELECTRIC_SHAPE_SUSPEND_CONSUMER", :boolean, nil) +shape_suspend_after = + env!("ELECTRIC_SHAPE_SUSPEND_AFTER", &Electric.Config.parse_human_readable_time!/1, nil) + system_metrics_poll_interval = env!( "ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL", @@ -264,6 +267,7 @@ config :electric, env!("ELECTRIC_SUBQUERY_BUFFER_MAX_TRANSACTIONS", :integer, nil), shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, storage_dir: storage_dir, storage: storage_spec, cleanup_interval_ms: diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 2221552035..086dd0d85f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -145,6 +145,7 @@ defmodule Electric.Application do cleanup_interval_ms: get_env(opts, :cleanup_interval_ms), shape_hibernate_after: get_env(opts, :shape_hibernate_after), shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), + shape_suspend_after: get_env(opts, :shape_suspend_after), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), process_spawn_opts: get_env(opts, :process_spawn_opts) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index c33ac3c271..290fbfa4f3 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -85,10 +85,15 @@ defmodule Electric.Config do otel_sampling_ratio: 0.01, metrics_sampling_ratio: 1, ## Memory + # After this duration of inactivity, consumer processes will hibernate + # to allow garbage collection shape_hibernate_after: :timer.seconds(30), - # Should we terminate consumer processes after `shape_hibernate_after` ms - # or just hibernate them? + # If enabled, terminate (suspend) consumer processes after hibernating. + # This frees memory more aggressively than hibernation alone. shape_enable_suspend?: false, + # After hibernating, wait this duration before suspending (terminating). + # Only applies when shape_enable_suspend? is true. + shape_suspend_after: :timer.minutes(10), # Sets max_requests for Bandit handler processes: # https://hexdocs.pm/bandit/Bandit.html#t:http_1_options/0 # "The maximum number of requests to serve in a single HTTP/{1,2} diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 5e8cd6d6b3..c8ea2b9001 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -186,8 +186,8 @@ defmodule Electric.Shapes.Consumer do def handle_call(:await_snapshot_start, from, state) do Logger.debug("Starting a wait on the snapshot #{state.shape_handle} for #{inspect(from)}}") - - {:noreply, State.add_waiter(state, from), state.hibernate_after} + state = State.add_waiter(state, from) + {:noreply, state, state.hibernate_after} end def handle_call({:handle_event, event, trace_context}, _from, state) do @@ -205,9 +205,8 @@ defmodule Electric.Shapes.Consumer do def handle_call({:subscribe_materializer, pid}, _from, state) do Logger.debug("Subscribing materializer for #{state.shape_handle}") Process.monitor(pid, tag: :materializer_down) - - {:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true}, - state.hibernate_after} + state = %{state | materializer_subscribed?: true} + {:reply, {:ok, state.latest_offset}, state, state.hibernate_after} end def handle_call({:stop, reason}, _from, state) do @@ -383,30 +382,37 @@ defmodule Electric.Shapes.Consumer do {:stop, reason, state} end - # Set a new value for hibernate after and set a timeout between - # hibernate_after and max_timeout in order to spread - # consumer suspend events. - def handle_info({:configure_suspend, hibernate_after, jitter_period}, state) do - {:noreply, %{state | hibernate_after: hibernate_after}, - Enum.random(hibernate_after..jitter_period)} + # Set new values for hibernate_after and suspend_after, and set a jittered + # timeout between hibernate_after and jitter_period to spread hibernation + # events. Each consumer will hibernate at the jittered timeout, then schedule + # suspension for suspend_after ms later. + def handle_info({:configure_suspend, hibernate_after, suspend_after, jitter_period}, state) do + state = %{state | hibernate_after: hibernate_after, suspend_after: suspend_after} + {:noreply, state, Enum.random(hibernate_after..jitter_period)} end def handle_info(:timeout, state) do - # we can only suspend (terminate) the consumer process if - # - # 1. Consumer suspend has been enabled in the stack config - # 2. we're not waiting for snapshot information - # 3. we are not part of a subquery dependency tree, that is either - # a. we have no dependent shapes - # b. we don't have a materializer subscribed - - if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do + state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} + + state = + if consumer_suspend_enabled?(state) and consumer_can_suspend?(state), + do: schedule_suspend_timer(state), + else: state + + {:noreply, state, :hibernate} + end + + # Suspend timer uses a generation number to handle stale timers. If activity + # occurred after the timer was scheduled, a new timer with a higher generation + # will have been scheduled, making this one stale (generation mismatch). + def handle_info({:suspend_timeout, generation}, state) do + if generation == state.suspend_generation and + consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end) {:stop, ShapeCleaner.consumer_suspend_reason(), state} else - state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} - - {:noreply, state, :hibernate} + # Stale timer or conditions changed - just restart the hibernate timeout + {:noreply, state, state.hibernate_after} end end @@ -419,6 +425,14 @@ defmodule Electric.Shapes.Consumer do not state.materializer_subscribed? end + defp schedule_suspend_timer(%{suspend_after: nil} = state), do: state + + defp schedule_suspend_timer(%{suspend_after: suspend_after, suspend_generation: gen} = state) do + next_gen = gen + 1 + :erlang.send_after(suspend_after, self(), {:suspend_timeout, next_gen}) + %{state | suspend_generation: next_gen} + end + @impl GenServer def terminate(reason, state) do Logger.debug(fn -> diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 9414f07571..2b366021b2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -41,7 +41,13 @@ defmodule Electric.Shapes.Consumer.State do # When a {Storage, :flushed, offset} message arrives during a pending # transaction, we defer the notification and store the max flushed offset # here. Multiple deferred notifications are collapsed into a single most recent offset. - pending_flush_offset: nil + pending_flush_offset: nil, + # Generation counter for suspend timers - incremented each time we schedule + # a new suspend timer. When a timer fires, it checks if its generation matches + # the current one; if not, activity occurred and the timer is stale (ignored). + suspend_generation: 0, + # How long after hibernation to suspend (in ms) + suspend_after: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -95,6 +101,12 @@ defmodule Electric.Shapes.Consumer.State do :shape_hibernate_after, Electric.Config.default(:shape_hibernate_after) ), + suspend_after: + Electric.StackConfig.lookup( + stack_id, + :shape_suspend_after, + Electric.Config.default(:shape_suspend_after) + ), buffering?: true } end diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index 935420b2d0..b98ba330ae 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -217,33 +217,35 @@ defmodule Electric.Shapes.ConsumerRegistry do disabled, because the configuration message will have the side-effect of waking all consumers from hibernation. - The `jitter_period` value allows for spreading the suspension of existing - consumers over a large time period to avoid a sudden rush of consumer - shutdowns after `hibernate_after` ms. + The `jitter_period` value allows for spreading the hibernation of existing + consumers over a time period to avoid a sudden rush of hibernation events. + Each consumer picks a random timeout between `hibernate_after` and `jitter_period`, + then hibernates and schedules suspension for `suspend_after` ms later. To re-enable consumer suspend: - # set the hibernation timeout to 1 minute but phase the suspension of - # existing consumers over a 20 minute period - Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20) + # hibernation timeout: 1 min, suspend timeout: 4 min, jitter window: 20 min + # Consumers will hibernate between 1-20 min, then suspend 4 min after hibernating + Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 4 * 60_000, 60_000 * 20) Disabling suspension is as easy as: Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false) """ - @spec enable_suspend(stack_id(), pos_integer(), pos_integer()) :: + @spec enable_suspend(stack_id(), pos_integer(), pos_integer(), pos_integer()) :: consumer_count :: non_neg_integer() - def enable_suspend(stack_id, hibernate_after, jitter_period) - when is_integer(hibernate_after) and is_integer(jitter_period) and - jitter_period > hibernate_after do + def enable_suspend(stack_id, hibernate_after, suspend_after, jitter_period) + when is_integer(hibernate_after) and is_integer(suspend_after) and + is_integer(jitter_period) and jitter_period > hibernate_after do Electric.StackConfig.put(stack_id, :shape_hibernate_after, hibernate_after) + Electric.StackConfig.put(stack_id, :shape_suspend_after, suspend_after) Electric.StackConfig.put(stack_id, :shape_enable_suspend?, true) :ets.foldl( fn {_shape_handle, pid}, n -> if Process.alive?(pid), - do: send(pid, {:configure_suspend, hibernate_after, jitter_period}) + do: send(pid, {:configure_suspend, hibernate_after, suspend_after, jitter_period}) n + 1 end, diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..462d474953 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -30,6 +30,7 @@ defmodule Electric.StackConfig do snapshot_timeout_to_first_data: :timer.seconds(30), shape_hibernate_after: Electric.Config.default(:shape_hibernate_after), shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), + shape_suspend_after: Electric.Config.default(:shape_suspend_after), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], process_spawn_opts: %{} diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index af22b52e35..98d3ba27e1 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -136,6 +136,10 @@ defmodule Electric.StackSupervisor do type: :boolean, default: Electric.Config.default(:shape_enable_suspend?) ], + shape_suspend_after: [ + type: :non_neg_integer, + default: Electric.Config.default(:shape_suspend_after) + ], snapshot_timeout_to_first_data: [ type: :pos_integer, default: Electric.Config.default(:snapshot_timeout_to_first_data) @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) + shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) shape_cache_opts = [ @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do inspector: inspector, shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, process_spawn_opts: process_spawn_opts, feature_flags: Map.get(config, :feature_flags, []) ]}, diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 40b27576c5..5e529676c1 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -598,6 +598,12 @@ defmodule Electric.Shapes.ConsumerTest do Map.get(ctx, :hibernate_after, 10_000) ) + Electric.StackConfig.put( + ctx.stack_id, + :shape_suspend_after, + Map.get(ctx, :shape_suspend_after, 60_000) + ) + if not Map.get(ctx, :allow_subqueries, true) do Electric.StackConfig.put(ctx.stack_id, :feature_flags, []) end @@ -1375,9 +1381,10 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {:flush_boundary_updated, 301}, 1_000 end - @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag hibernate_after: 10, shape_suspend_after: 20 + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true - test "should terminate after :hibernate_after ms", ctx do + test "should suspend after hibernate_after + shape_suspend_after ms", ctx do register_as_replication_client(ctx.stack_id) {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) @@ -1409,7 +1416,8 @@ defmodule Electric.Shapes.ConsumerTest do refute Consumer.whereis(ctx.stack_id, shape_handle) end - @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag hibernate_after: 10, shape_suspend_after: 10 + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true test "should hibernate not suspend if has dependencies", ctx do register_as_replication_client(ctx.stack_id) @@ -1494,7 +1502,8 @@ defmodule Electric.Shapes.ConsumerTest do assert Consumer.whereis(ctx.stack_id, shape_handle) - Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 10) + # hibernate_after=5, shape_suspend_after=5, jitter_period=10 + Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 5, 10) Process.sleep(60) @@ -1503,6 +1512,122 @@ defmodule Electric.Shapes.ConsumerTest do refute Consumer.whereis(ctx.stack_id, shape_handle) end + @tag hibernate_after: 10, + shape_suspend_after: 150, + with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "should hibernate first then suspend after shape_suspend_after ms", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate_after (10ms) + small buffer + # Suspend won't happen until hibernate_after + shape_suspend_after = 10 + 150 = 160ms + Process.sleep(50) + + # Should be hibernated, not suspended yet (we're at ~50ms, suspend at ~160ms) + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Wait for shape_suspend_after (150ms from hibernate) to complete + # We're at ~50ms, need to wait another ~150ms to be past 160ms + Process.sleep(180) + + # Now should be suspended + assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}} + + refute Consumer.whereis(ctx.stack_id, shape_handle) + end + + @tag hibernate_after: 10, + shape_suspend_after: 200, + with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "activity during hibernation cancels pending suspend", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + lsn2 = Lsn.from_integer(301) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn1 = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate (hibernate_after=10ms + buffer) + Process.sleep(30) + + # Should be hibernated + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + # Wait ~50ms so suspend timer has been running but not fired yet + # (shape_suspend_after=200ms, so we're at ~80ms total, well before 200ms) + Process.sleep(50) + + # Send another transaction - this should cancel the suspend timer + txn2 = + complete_txn_fragment(3, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "22"}, + log_offset: LogOffset.new(lsn2, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + assert_receive {:flush_boundary_updated, 301}, 1_000 + + # Wait past what would have been the original shape_suspend_after window + # Original timer started at ~30ms, would fire at ~230ms + # We're now at ~80ms, wait 160ms to reach ~240ms + # But new timer started at ~80ms, would fire at ~280ms + # So at ~240ms the process should still be alive + Process.sleep(160) + + # Should NOT have suspended because activity reset the timer + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Process should still be alive (hibernated again) + assert Process.alive?(consumer_pid) + end + @tag with_pure_file_storage_opts: [compaction_period: 5, keep_complete_chunks: 133] test "compaction is scheduled and invoked for a shape that has compaction enabled", ctx do parent = self() diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index bcb4ee0949..c4d27dc09d 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -110,6 +110,7 @@ defmodule Support.ComponentSetup do shape_changes_registry: Map.get(ctx, :registry, Electric.StackSupervisor.registry_name(stack_id)), shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000), + shape_suspend_after: Map.get(ctx, :shape_suspend_after, 1_000), shape_enable_suspend?: Map.get(ctx, :suspend, false), feature_flags: Electric.Config.get_env(:feature_flags), process_spawn_opts: Map.get(ctx, :process_spawn_opts, %{})