diff --git a/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/discarded-threads.md b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/discarded-threads.md new file mode 100644 index 0000000000..f468f9b495 --- /dev/null +++ b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/discarded-threads.md @@ -0,0 +1,3 @@ +# Discarded threads + +(none yet) diff --git a/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/open-questions.md b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/open-questions.md new file mode 100644 index 0000000000..266f6451f8 --- /dev/null +++ b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/open-questions.md @@ -0,0 +1,3 @@ +# Open questions + +(none yet) diff --git a/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/progress.md b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/progress.md new file mode 100644 index 0000000000..c002ab7ee2 --- /dev/null +++ b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/progress.md @@ -0,0 +1,31 @@ +# Progress log + +## 2026-03-20 + +### Initial analysis +- Read FlushTracker, ShapeLogCollector, Consumer, ConsumerRegistry, ShapeCleaner source +- Read existing test files for both FlushTracker and ShapeLogCollector +- Identified the exact code path: Consumer.handle_materializer_down → {:stop, :shutdown, state} → terminate → handle_writer_termination clause 3 (:ok, no cleanup) +- Existing tests cover: consumer crash during broadcast (detected by ConsumerRegistry), multi-fragment crash between fragments +- Missing: consumer dying out-of-band AFTER successful broadcast delivery, with no subsequent transactions to that shape + +### Implementation +- Wrote 3 FlushTracker unit tests showing stale entries blocking advancement indefinitely +- Wrote 3 SLC integration tests: + - Main bug test: kill consumer with :kill (skips terminate/remove_shape), send txns only to other shapes, verify flush stuck + - Contrast test: graceful termination (runs terminate → remove_shape) allows flush to advance + - Recovery test: when a txn finally touches the dead shape's table, undeliverable detection cleans up +- Key design decisions: + - Used two separate tables (table_a, table_b) so transactions can selectively target one shape + - Used :kill to simulate the end state of handle_materializer_down path (dead process, no cleanup) + - The @describetag is needed (not @tag) to propagate inspector config to setup functions + +### Operational issues +- Initially edited files in ~/code/electric-sql/electric instead of ~/agents/github/erik/repos/electric + - Had to create a patch and git apply it to the correct repo +- Used @tag instead of @describetag, which doesn't propagate to setup functions +- First version of tests used `refute_receive` for the initial flush notification, but FlushTracker does partially advance when one shape catches up (to one below the stuck shape's offset). Fixed to `assert_receive` the partial advance, then `refute_receive` further advances. + +### PR +- Created PR #4035: https://github.com/electric-sql/electric/pull/4035 +- Added "claude" label for automated review diff --git a/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/prompt.md b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/prompt.md new file mode 100644 index 0000000000..6b86366c08 --- /dev/null +++ b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/prompt.md @@ -0,0 +1,11 @@ +# Task prompt + +Issue #8 from electric-sql/alco-agent-tasks: "FlushTracker stalling when tracked consumer dies out-of-band" + +Write unit tests that exercise the edge case where a consumer process is tracked by FlushTracker but then dies independently (via handle_materializer_down with :shutdown reason), leaving a stale shape entry that permanently blocks flush advancement. + +Key constraints: +- Use different approaches for each test +- Avoid mocking too many components +- Avoid inventing calls or messages inside the test body +- Try setting conditions so the app hits the edge case naturally diff --git a/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/task.md b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/task.md new file mode 100644 index 0000000000..fe311e8a0f --- /dev/null +++ b/.agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/task.md @@ -0,0 +1,20 @@ +# Task: FlushTracker stale consumer edge case tests + +## Problem + +When a consumer dies via `handle_materializer_down` with `:shutdown` reason: +1. Consumer stops with `{:stop, :shutdown, state}` (skipping `stop_and_clean`) +2. `terminate/2` calls `handle_writer_termination` clause 3 which returns `:ok` without cleanup +3. ConsumerRegistry ETS entry is NOT removed (unregister_name is a no-op) +4. ShapeLogCollector is NOT notified to remove the shape from FlushTracker +5. If no future transactions affect that shape, the stale FlushTracker entry persists indefinitely +6. This blocks `last_global_flushed_offset` advancement → unbounded WAL growth + +This only affects `allow_subqueries` stacks since `handle_materializer_down` requires materializers. + +## Goal + +Write tests demonstrating this edge case at different levels: +1. FlushTracker unit level: a shape tracked but never flushed/removed blocks advancement +2. ShapeLogCollector integration level: a consumer that dies out-of-band after receiving a transaction leaves FlushTracker stuck +3. Full integration: demonstrate the materializer-death path that triggers handle_materializer_down → consumer death → stale FlushTracker diff --git a/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs index 6fabe24447..af6aa1df51 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs @@ -457,6 +457,83 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do end end + describe "out-of-band consumer death leaves stale FlushTracker entry" do + test "shape tracked across many transactions never advances global offset", %{ + tracker: tracker + } do + # Simulate: two shapes receive txn at lsn 10. One shape's consumer dies + # out-of-band (no handle_shape_removed ever called). Subsequent txns only + # affect the alive shape. The dead shape's entry pins the global offset. + tracker = handle_txn(tracker, batch(lsn: 10, last_offset: 10), ["alive", "dead"]) + + # alive flushes lsn 10 + tracker = FlushTracker.handle_flush_notification(tracker, "alive", LogOffset.new(10, 10)) + # Global offset is pinned one below dead's initial position (tx_offset=9) + assert_receive {:flush_confirmed, 8} + + # Many subsequent transactions only affect the alive shape + tracker = + Enum.reduce(11..20, tracker, fn lsn, tracker -> + tracker + |> handle_txn(batch(lsn: lsn, last_offset: 10), ["alive"]) + |> FlushTracker.handle_flush_notification("alive", LogOffset.new(lsn, 10)) + end) + + # Despite 10 more transactions fully flushed by the alive shape, + # global offset has not moved past the dead shape's stuck position + refute_receive {:flush_confirmed, 10} + refute FlushTracker.empty?(tracker) + end + + test "removing the stale shape after prolonged stall unblocks to latest seen offset", %{ + tracker: tracker + } do + # Same setup: dead shape pins the offset + tracker = handle_txn(tracker, batch(lsn: 10, last_offset: 10), ["alive", "dead"]) + + tracker = FlushTracker.handle_flush_notification(tracker, "alive", LogOffset.new(10, 10)) + assert_receive {:flush_confirmed, 8} + + # More txns flow through alive shape only + tracker = + Enum.reduce(11..15, tracker, fn lsn, tracker -> + tracker + |> handle_txn(batch(lsn: lsn, last_offset: 10), ["alive"]) + |> FlushTracker.handle_flush_notification("alive", LogOffset.new(lsn, 10)) + end) + + # Now simulate detection of the dead consumer (e.g. a sweep or monitor) + tracker = FlushTracker.handle_shape_removed(tracker, "dead") + + # Global offset jumps all the way to the latest seen offset (lsn 15) + assert_receive {:flush_confirmed, 15} + assert FlushTracker.empty?(tracker) + end + + test "stale shape blocks advancement even when its tracked offset is much older than current", + %{tracker: tracker} do + # Dead shape gets tracked at a very early offset + tracker = handle_txn(tracker, batch(lsn: 1, last_offset: 10), ["dead"]) + + # Alive shape joins later and processes many transactions + tracker = + Enum.reduce(2..50, tracker, fn lsn, tracker -> + tracker + |> handle_txn(batch(lsn: lsn, last_offset: 10), ["alive"]) + |> FlushTracker.handle_flush_notification("alive", LogOffset.new(lsn, 10)) + end) + + # The global offset is still stuck at lsn 1's prev_log_offset position + # because the dead shape was never removed + refute_receive {:flush_confirmed, 2} + refute FlushTracker.empty?(tracker) + + # Clean it up + _tracker = FlushTracker.handle_shape_removed(tracker, "dead") + assert_receive {:flush_confirmed, 50} + end + end + # Helper: calls handle_txn_fragment with shapes_with_changes defaulting to # all affected shapes (the common case for single-fragment transactions). defp handle_txn(tracker, fragment, affected_shapes) do diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index a90f4b57c5..54492dfd9f 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -1280,6 +1280,235 @@ defmodule Electric.Replication.ShapeLogCollectorTest do end end + describe "FlushTracker stalling when tracked consumer dies out-of-band" do + @two_table_inspector Support.StubInspector.new(%{ + {1234, {"public", "table_a"}} => [ + %{name: "id", type: "int8", pk_position: 0} + ], + {5678, {"public", "table_b"}} => [ + %{name: "id", type: "int8", pk_position: 0} + ] + }) + + @shape_a Shape.new!("table_a", inspector: @two_table_inspector) + @shape_b Shape.new!("table_b", inspector: @two_table_inspector) + + @describetag inspector: @two_table_inspector + setup :setup_log_collector + + setup ctx do + parent = self() + + consumers = [ + {:alive, + start_supervised!( + {Support.TransactionConsumer, + id: :alive, + stack_id: ctx.stack_id, + parent: parent, + shape: @shape_a, + shape_handle: "shape-alive"}, + id: {:consumer, :alive} + )}, + {:doomed, + start_supervised!( + {Support.TransactionConsumer, + id: :doomed, + stack_id: ctx.stack_id, + parent: parent, + shape: @shape_b, + shape_handle: "shape-doomed"}, + id: {:consumer, :doomed} + )} + ] + + %{consumers: consumers} + end + + # This test documents a known bug (electric-sql/electric#3980): + # when a consumer dies without calling remove_shape (e.g. via handle_materializer_down + # with :shutdown reason), and no subsequent transactions touch that shape's table, + # FlushTracker stays stuck indefinitely. When a fix is implemented (e.g. PID monitoring + # or periodic liveness sweep), this test should be updated to assert the fix instead. + test "consumer killed out-of-band after delivery permanently blocks flush advancement", ctx do + register_as_replication_client(ctx.stack_id) + + lsn = Lsn.from_integer(10) + + # Transaction that inserts into both tables → both shapes get tracked in FlushTracker + txn = + transaction(100, lsn, [ + %Changes.NewRecord{ + relation: {"public", "table_a"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "table_b"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 1) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Both consumers receive the transaction + assert_receive {Support.TransactionConsumer, {:alive, _}, [_]} + assert_receive {Support.TransactionConsumer, {:doomed, _}, [_]} + + # Kill the doomed consumer with :kill to prevent terminate from running. + # This simulates a consumer dying via handle_materializer_down → {:stop, :shutdown} + # where handle_writer_termination clause 3 returns :ok without cleanup. + # Using :kill here achieves the same end state: dead process, no remove_shape call. + {_, doomed_pid} = List.keyfind!(ctx.consumers, :doomed, 0) + kill_consumer(doomed_pid, :kill) + + # The alive consumer flushes its data + ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn, 1)) + + # Flush boundary partially advances: the alive shape caught up, but the dead + # shape pins the global offset at its prev_log_offset (tx_offset=9) minus 1 = 8. + assert_receive {:flush_boundary_updated, 8}, 100 + + # Send more transactions that ONLY affect table_a (the alive shape). + # Since no transaction touches table_b, SLC never discovers the dead consumer. + for i <- 11..15 do + lsn_i = Lsn.from_integer(i) + + txn_i = + transaction(100 + i, lsn_i, [ + %Changes.NewRecord{ + relation: {"public", "table_a"}, + record: %{"id" => "#{i}"}, + log_offset: LogOffset.new(lsn_i, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn_i, ctx.stack_id) + assert_receive {Support.TransactionConsumer, {:alive, _}, [_]} + ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn_i, 0)) + end + + # Despite 5 more fully-flushed transactions through lsn 15, the flush boundary + # is permanently stuck at 8. It never reaches the current lsn (15). + # This is the bug: the dead consumer's stale FlushTracker entry blocks WAL flush. + refute_receive {:flush_boundary_updated, _}, 100 + end + + # NOTE: This test uses Support.TransactionConsumer whose terminate/2 always + # calls ShapeLogCollector.remove_shape. The real Consumer does NOT do this + # when dying with :shutdown (handle_writer_termination clause 3 returns :ok). + # This test demonstrates that the cleanup mechanism works when invoked, + # serving as a contrast to the "killed out-of-band" test above where it isn't. + test "consumer that calls remove_shape on termination allows flush to advance", ctx do + register_as_replication_client(ctx.stack_id) + + lsn = Lsn.from_integer(10) + + # Transaction affecting both tables + txn = + transaction(100, lsn, [ + %Changes.NewRecord{ + relation: {"public", "table_a"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "table_b"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 1) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {Support.TransactionConsumer, {:alive, _}, [_]} + assert_receive {Support.TransactionConsumer, {:doomed, _}, [_]} + + # Alive consumer flushes — partial advance to 8 (doomed still pending) + ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn, 1)) + assert_receive {:flush_boundary_updated, 8}, 100 + + # Stop the doomed consumer gracefully — terminate runs, calls remove_shape, + # which triggers FlushTracker.handle_shape_removed. This is the CORRECT path. + {_, doomed_pid} = List.keyfind!(ctx.consumers, :doomed, 0) + Process.unlink(doomed_pid) + stop_supervised!({:consumer, :doomed}) + + # Now the flush boundary advances all the way to lsn because the stale shape + # was cleaned up by the graceful termination path + expected_lsn = Lsn.to_integer(lsn) + assert_receive {:flush_boundary_updated, ^expected_lsn}, 200 + end + + test "transaction to dead shape's table eventually unblocks flush via undeliverable detection", + ctx do + register_as_replication_client(ctx.stack_id) + + lsn = Lsn.from_integer(10) + + txn = + transaction(100, lsn, [ + %Changes.NewRecord{ + relation: {"public", "table_a"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "table_b"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 1) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {Support.TransactionConsumer, {:alive, _}, [_]} + assert_receive {Support.TransactionConsumer, {:doomed, _}, [_]} + + # Kill the doomed consumer out-of-band + {_, doomed_pid} = List.keyfind!(ctx.consumers, :doomed, 0) + kill_consumer(doomed_pid, :kill) + + # Alive flushes — partial advance to 8 + ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn, 1)) + assert_receive {:flush_boundary_updated, 8}, 100 + + # Now a NEW transaction comes that affects BOTH tables. + # ConsumerRegistry.broadcast detects the dead doomed consumer → undeliverable. + # SLC calls FlushTracker.handle_shape_removed for undeliverable shapes. + lsn2 = Lsn.from_integer(20) + + txn2 = + transaction(200, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "table_a"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn2, 0) + }, + %Changes.NewRecord{ + relation: {"public", "table_b"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn2, 1) + } + ]) + + log = + ExUnit.CaptureLog.capture_log(fn -> + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + end) + + assert log =~ "Consumer processes crashed or missing during broadcast" + + assert_receive {Support.TransactionConsumer, {:alive, _}, [_]} + + # The stale entry was removed by the undeliverable detection in publish/1. + # The alive consumer flushes the second transaction. + ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn2, 1)) + + expected_lsn = Lsn.to_integer(lsn2) + assert_receive {:flush_boundary_updated, ^expected_lsn}, 100 + end + end + defp transaction(xid, lsn, changes) do last_log_offset = case Enum.reverse(changes) do