From 0afe471a5944f34c4be6b3e552793d231e7d8b54 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 11:24:18 +0100 Subject: [PATCH 01/10] New regression test for the stuck flush tracker issue --- .../test/electric/shapes/consumer_test.exs | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 6d0fe38cbe..bc0b899ad7 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -1774,6 +1774,105 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout end + @tag allow_subqueries: false, with_pure_file_storage_opts: [flush_period: 10_000] + test "flush notification offset is aligned when storage flushes before commit arrives at consumer", + %{stack_id: stack_id} do + # Regression test for https://github.com/electric-sql/electric/issues/4058 + # + # When a non-commit fragment has enough data to trigger a buffer-size + # flush (>= 64KB), the :flushed message is placed in the consumer's + # mailbox during processing. The consumer process ends up handling the :flushed message + # before receiving the commit fragment. But since the offset it sends to FlushTracker + # predates the commit fragment's offset, the FlushTracker keeps the shape in the + # "pending" state and there's no follow-up notification from the consumer that would + # unblock it. + # + # A high flush_period prevents timer-based flushes so the only flush + # comes from the buffer-size trigger, making the test deterministic. + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, stack_id) + + ref = Shapes.Consumer.register_for_changes(stack_id, shape_handle) + register_as_replication_client(stack_id) + + xid = 11 + lsn = Lsn.from_integer(10) + relevant_change_offset = LogOffset.new(lsn, 0) + + # The fragment has a large shape-relevant record (>64KB) that triggers a + # buffer-size flush during write, PLUS a non-matching record at a higher + # offset. This means the source fragment's last_log_offset is higher than + # the shape's last written offset — just like in production where + # transactions touch multiple tables. + padding = String.duplicate("x", 70_000) + + non_commit_fragment = + txn_fragment( + xid, + lsn, + [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => padding}, + log_offset: relevant_change_offset + }, + # This change does NOT match shape1 (test_table) but raises the + # fragment's last_log_offset above the shape's written offset. + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 50) + } + ], + has_begin?: true + ) + + # Commit fragment has only a change for a different table. The consumer + # writes nothing for it but still finalises the pending transaction, + # populating txn_offset_mapping. + commit_fragment = + txn_fragment( + xid, + lsn, + [ + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "99"}, + log_offset: LogOffset.new(lsn, 100) + } + ], + has_commit?: true + ) + + # Send non-commit fragment. The large record triggers a buffer flush, + # placing {Storage, :flushed, offset} in the consumer's mailbox. + Support.Trace.trace_shape_log_collector_calls( + pid: Shapes.Consumer.whereis(stack_id, shape_handle), + functions: [:notify_flushed] + ) + + assert :ok = ShapeLogCollector.handle_event(non_commit_fragment, stack_id) + + assert [ + {ShapeLogCollector, :notify_flushed, + [stack_id, shape_handle, relevant_change_offset]} + ] == Support.Trace.collect_traced_calls() + + # Send the commit fragment to finalize the transaction in shape consumer. + # FlushTracker still tracks the transaction but the consumer won't send another flush + # notification, so the shape remains stuck and blocks all further advancement of flushed + # offset. + assert :ok = ShapeLogCollector.handle_event(commit_fragment, stack_id) + + # Consumer has processed the relevant change... + assert_receive {^ref, :new_changes, ^relevant_change_offset}, @receive_timeout + + # ...but the replication client never gets a flush boundary advancement + tx_offset = commit_fragment.last_log_offset.tx_offset + assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout + end + @tag allow_subqueries: false, with_pure_file_storage_opts: [flush_period: 1] test "dead consumer doesn't block flush notifications from advancing as live consumers flush to storage", ctx do From 6a9f34a549705c62d7ce08d57cb3cb83825e1506 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 12:59:41 +0100 Subject: [PATCH 02/10] Stricter assertions in EventRouterTest Assert that the pattern matches on maps cover all map keys --- .../electric/shapes/event_router_test.exs | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/event_router_test.exs b/packages/sync-service/test/electric/shapes/event_router_test.exs index eae7c2b59f..f11f823f33 100644 --- a/packages/sync-service/test/electric/shapes/event_router_test.exs +++ b/packages/sync-service/test/electric/shapes/event_router_test.exs @@ -96,6 +96,8 @@ defmodule Electric.Shapes.EventRouterTest do {result, _router} = EventRouter.event_by_shape_handle(router, batch) + assert ["s1", "s2"] == result |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -275,6 +277,8 @@ defmodule Electric.Shapes.EventRouterTest do {result, _router} = EventRouter.event_by_shape_handle(router, batch) + assert ["s1", "s2", "s3", "s4"] == result |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -318,6 +322,8 @@ defmodule Electric.Shapes.EventRouterTest do {result, _router} = EventRouter.event_by_shape_handle(router, batch) + assert ["s1", "s2"] == result |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{affected_relations: s1_relations}, "s2" => %TransactionFragment{affected_relations: s2_relations} @@ -374,9 +380,8 @@ defmodule Electric.Shapes.EventRouterTest do {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) - assert %{ - "s1" => %TransactionFragment{has_begin?: true, changes: [^insert]} - } = result2 + assert ["s1"] == Map.keys(result2) + assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert]}} = result2 end test "Begin seen once per shape even across multiple batches" do @@ -423,13 +428,14 @@ defmodule Electric.Shapes.EventRouterTest do batch1 = %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1]} {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1]}} = result1 insert2 = %NewRecord{relation: {"public", "t1"}, record: %{"id" => "2"}} batch2 = %TransactionFragment{xid: 100, has_begin?: false, changes: [insert2]} {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) - + assert ["s2"] == Map.keys(result2) assert %{"s2" => %TransactionFragment{has_begin?: true, changes: [^insert2]}} = result2 end @@ -445,6 +451,7 @@ defmodule Electric.Shapes.EventRouterTest do {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1]}} = result1 insert2 = %NewRecord{relation: {"public", "t1"}, record: %{"id" => "2"}} @@ -459,6 +466,8 @@ defmodule Electric.Shapes.EventRouterTest do {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) + assert ["s1", "s2"] == result2 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{has_begin?: false, commit: ^commit_op, changes: []}, "s2" => %TransactionFragment{ @@ -594,6 +603,7 @@ defmodule Electric.Shapes.EventRouterTest do batch1 = %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1]} {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1]}} = result1 insert2 = %NewRecord{relation: {"public", "t1"}, record: %{"id" => "2"}} @@ -602,6 +612,8 @@ defmodule Electric.Shapes.EventRouterTest do {result2, router} = EventRouter.event_by_shape_handle(router, batch2) + assert ["s2", "s3"] == result2 |> Map.keys() |> Enum.sort() + assert %{ "s2" => %TransactionFragment{has_begin?: true, changes: [^insert2]}, "s3" => %TransactionFragment{has_begin?: true, changes: [^insert3]} @@ -619,6 +631,8 @@ defmodule Electric.Shapes.EventRouterTest do {result3, _router} = EventRouter.event_by_shape_handle(router, batch3) + assert ["s1", "s2", "s3"] == result3 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -646,6 +660,7 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}} = result1 # Shape s2 added mid-transaction - should not receive any events from this transaction @@ -674,6 +689,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s1 gets its changes + commit, s2 gets nothing (added mid-transaction) + assert ["s1"] == Map.keys(result2) + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -698,6 +715,8 @@ defmodule Electric.Shapes.EventRouterTest do } ) + assert ["s1", "s2"] == result3 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -726,6 +745,7 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}} = result1 # Shapes s2 and s3 added mid-transaction - neither should receive any events from this @@ -753,6 +773,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s1 gets its changes + commit, s2 and s3 get nothing (added mid-transaction) + assert ["s1"] == Map.keys(result2) + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -778,6 +800,8 @@ defmodule Electric.Shapes.EventRouterTest do } ) + assert ["s1", "s2", "s3"] == result3 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -812,6 +836,8 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1", "s2"] == result1 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}, "s2" => %TransactionFragment{has_begin?: true, changes: [^insert2a]} @@ -836,6 +862,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s1 gets nothing (removed), s2 gets its changes + commit + assert ["s2"] == Map.keys(result2) + assert %{ "s2" => %TransactionFragment{ has_begin?: false, @@ -859,6 +887,7 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}} = result1 # s2 added mid-transaction @@ -879,6 +908,7 @@ defmodule Electric.Shapes.EventRouterTest do ) # s2 skipped because added mid-transaction + assert ["s1"] == Map.keys(result2) assert %{"s1" => %TransactionFragment{has_begin?: false, changes: [^insert1b]}} = result2 # s2 removed before transaction ends @@ -900,6 +930,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s2 removed, so only s1 gets events + assert ["s1"] == Map.keys(result3) + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -924,6 +956,8 @@ defmodule Electric.Shapes.EventRouterTest do } ) + assert ["s1"] == Map.keys(result4) + assert %{ "s1" => %TransactionFragment{ has_begin?: true, From 3ca8aa6141416561241f4dc023e62e0dcf85d9e8 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 12:59:10 +0100 Subject: [PATCH 03/10] Verify txn fragment reslicing in EventRouter when unrelated changes are involved --- .../electric/shapes/event_router_test.exs | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/packages/sync-service/test/electric/shapes/event_router_test.exs b/packages/sync-service/test/electric/shapes/event_router_test.exs index f11f823f33..e670348b47 100644 --- a/packages/sync-service/test/electric/shapes/event_router_test.exs +++ b/packages/sync-service/test/electric/shapes/event_router_test.exs @@ -8,6 +8,7 @@ defmodule Electric.Shapes.EventRouterTest do alias Electric.Replication.Changes.TruncatedRelation alias Electric.Replication.Changes.UpdatedRecord alias Electric.Replication.Changes.TransactionFragment + alias Electric.Replication.LogOffset alias Electric.Shapes.EventRouter alias Electric.Shapes.Shape alias Support.StubInspector @@ -505,6 +506,100 @@ defmodule Electric.Shapes.EventRouterTest do assert result2 == %{"s1" => batch2} end + test "commit-only fragment for shape when commit has changes only for a different table" do + router = + EventRouter.new() + |> EventRouter.add_shape("s1", Shape.new!("t1", inspector: @inspector)) + |> EventRouter.add_shape("s2", Shape.new!("t2", inspector: @inspector)) + + insert_t1 = %NewRecord{ + relation: {"public", "t1"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(10, 0) + } + + insert_t2_first = %NewRecord{ + relation: {"public", "t2"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(10, 2) + } + + batch1 = %TransactionFragment{ + xid: 100, + has_begin?: true, + last_log_offset: LogOffset.new(10, 2), + changes: [insert_t1, insert_t2_first], + change_count: 2 + } + + {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + + # Shape s1 only gets the t1 change but last_log_offset is copied from the txn fragment + batch1_last_offset = LogOffset.new(10, 2) + + assert ["s1", "s2"] == Map.keys(result1) + + assert %{ + "s1" => %TransactionFragment{ + has_begin?: true, + commit: nil, + changes: [^insert_t1], + change_count: 1, + last_log_offset: ^batch1_last_offset + }, + "s2" => %TransactionFragment{ + has_begin?: true, + commit: nil, + changes: [^insert_t2_first], + change_count: 1, + last_log_offset: ^batch1_last_offset + } + } = result1 + + # Commit fragment has only a t2 change + insert_t2_last = %NewRecord{ + relation: {"public", "t2"}, + record: %{"id" => "99"}, + log_offset: LogOffset.new(10, 4) + } + + commit_op = %Commit{commit_timestamp: ~U[2024-01-01 00:00:00Z]} + + batch2 = %TransactionFragment{ + xid: 100, + has_begin?: false, + commit: commit_op, + last_log_offset: LogOffset.new(10, 4), + changes: [insert_t2_last], + change_count: 1 + } + + {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) + + # s1 gets a commit-only fragment: change_count=0, + # and last_log_offset from the commit fragment + batch2_last_offset = LogOffset.new(10, 4) + + assert ["s1", "s2"] == Map.keys(result2) + + assert %{ + "s1" => %TransactionFragment{ + has_begin?: false, + commit: ^commit_op, + changes: [], + change_count: 0, + last_log_offset: ^batch2_last_offset + }, + "s2" => %TransactionFragment{ + has_begin?: false, + commit: ^commit_op, + changes: [^insert_t2_last], + change_count: 1, + last_log_offset: ^batch2_last_offset + } + } = result2 + end + test "transaction state is reset after Commit" do router = EventRouter.new() From 6165115c25c00e62d5e77e1027e45e8d43020d69 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 15:02:36 +0100 Subject: [PATCH 04/10] Implementation plan --- .../plans/2026-03-26-issue-4063/plan.md | 542 ++++++++++++++++++ 1 file changed, 542 insertions(+) create mode 100644 packages/sync-service/plans/2026-03-26-issue-4063/plan.md diff --git a/packages/sync-service/plans/2026-03-26-issue-4063/plan.md b/packages/sync-service/plans/2026-03-26-issue-4063/plan.md new file mode 100644 index 0000000000..d08b27a602 --- /dev/null +++ b/packages/sync-service/plans/2026-03-26-issue-4063/plan.md @@ -0,0 +1,542 @@ +# Deferred Flush Notification for Multi-Fragment Transactions + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fix the race condition where a consumer sends an unaligned flush offset to FlushTracker when storage flushes data from a non-commit fragment before the commit fragment populates `txn_offset_mapping`. + +**Architecture:** Two coordinated changes: (1) Revert FlushTracker to only track shapes from commit fragments (undoing the early-tracking approach from PR #3986). (2) In the Consumer, defer `{Storage, :flushed, offset}` notifications that arrive during a pending transaction, then process the deferred notification after the commit fragment has been handled and `txn_offset_mapping` is populated. + +**Tech Stack:** Elixir, GenServer, ExUnit + +**Ref:** https://github.com/electric-sql/electric/issues/4063 + +--- + +## File Map + +| File | Action | Responsibility | +|------|--------|----------------| +| `lib/electric/replication/shape_log_collector/flush_tracker.ex` | Modify | Revert non-commit fragment tracking to no-op; commit tracks ALL delivered shapes | +| `lib/electric/shapes/consumer/state.ex` | Modify | Add `pending_flush_offset` field to struct | +| `lib/electric/shapes/consumer.ex` | Modify | Defer `:flushed` messages during pending txn; process deferred in `maybe_complete_pending_txn` and `skip_txn_fragment` | +| `test/electric/replication/shape_log_collector/flush_tracker_test.exs` | Modify | Update tests: non-commit fragment is now a no-op, commit tracks all affected shapes | +| `test/electric/shapes/consumer_test.exs` | Modify | Update regression test to verify deferred flush behavior | + +--- + +### Task 1: Revert FlushTracker non-commit fragment tracking + +**Files:** +- Modify: `lib/electric/replication/shape_log_collector/flush_tracker.ex:105-141` +- Modify: `test/electric/replication/shape_log_collector/flush_tracker_test.exs` + +The non-commit clause currently calls `track_shapes()`. It needs to be removed so that only the clause that matches on %Commit{} remains. The commit clause currently filters `affected_shapes` to only those in `shapes_with_changes` or already tracked. Since non-commit fragments no longer track anything, `is_map_key(state.last_flushed, shape)` is always false for shapes not in the commit. Revert to tracking ALL `affected_shapes` delivered for the commit. + +- [ ] **Step 1: Remove FlushTracker non-commit clause** + +In `flush_tracker.ex`, replace lines 105-115: + +```diff +- # Non-commit fragment: track affected shapes but don't update last_seen_offset +- # or notify. This ensures shapes are registered early so flush notifications +- # from Consumers aren't lost when storage flushes before the commit arrives. +- def handle_txn_fragment( +- %__MODULE__{} = state, +- %TransactionFragment{commit: nil, last_log_offset: last_log_offset}, +- affected_shapes, +- _shapes_with_changes +- ) do +- track_shapes(state, last_log_offset, affected_shapes) +- end +``` + +- [ ] **Step 2: Update FlushTracker commit clause to track all affected shapes** + +In `flush_tracker.ex`, replace lines 117-141: + +```elixir + # Commit fragment: track all shapes affected by all fragments of the transaction and update last_seen_offset. + def handle_txn_fragment( + %__MODULE__{} = state, + %TransactionFragment{commit: %Commit{}, last_log_offset: last_log_offset}, + affected_shapes, + _shapes_with_changes + ) do + state = track_shapes(state, last_log_offset, affected_shapes) + + state = %{state | last_seen_offset: last_log_offset} + + if state.last_flushed == %{} do + update_global_offset(state) + else + state + end + end +``` + +- [ ] **Step 3: Remove `last_seen_offset == before_all` guard from `handle_flush_notification`** + +In `flush_tracker.ex`, lines 214-221, replace: + +```elixir + # Only update global offset if we've seen at least one commit. + # Before any commit, last_seen_offset is before_all and there's + # nothing meaningful to report. + if state.last_seen_offset == LogOffset.before_all() do + state + else + update_global_offset(state) + end +``` + +with just: + +```elixir + update_global_offset(state) +``` + +This guard was added in PR #3986 because non-commit fragments could trigger flush notifications before any commit. With the deferred approach in Consumer, flush notifications only reach FlushTracker after a commit has been processed. + +- [ ] **Step 4: Update FlushTracker tests** + +Several tests exercise non-commit fragment tracking that no longer applies. Update: + +**Test "non-commit fragment tracks shapes but does not notify or update last_seen"** (line 30): +Non-commit fragments are now raising FunctionClauseError. Add an assert_raise to the test + +```elixir + test "non-commit fragment is a no-op", %{tracker: tracker} do + fragment = %TransactionFragment{ + xid: 1, + lsn: 1, + last_log_offset: LogOffset.new(1, 0), + commit: nil + } + + assert_raise FunctionClauseError, fn -> handle_txn(tracker, fragment, ["shape1"]) end + end +``` + +**Test "shape tracked by non-commit fragment can be flushed before commit arrives"** (line 59): +This test's premise no longer holds (non-commit doesn't track). Replace with a test that verifies the commit tracks all affected shapes regardless of whether they had changes in the commit fragment: + +```elixir + test "commit tracks all affected shapes even those without changes in the commit fragment", + %{tracker: tracker} do + # shape1 had changes in a non-commit fragment (not tracked by FlushTracker). + # The commit fragment lists it in affected_shapes (via shapes_in_txn). + # shapes_with_changes is empty (no data changes in commit). + tracker = + FlushTracker.handle_txn_fragment( + tracker, + batch(xid: 1, lsn: 5, last_offset: 10), + ["shape1"], + MapSet.new() + ) + + refute FlushTracker.empty?(tracker) + + # Flush at the commit offset catches up + tracker = + FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) + + assert_receive {:flush_confirmed, 5} + assert FlushTracker.empty?(tracker) + end +``` + +**Test "shape tracked by non-commit and still pending is updated by commit"** (line 95): +Remove this test — its premise (non-commit tracking + commit updating last_sent) no longer applies. + +**Test "already-flushed shape with new changes in commit is re-tracked"** (line 163): +Remove this test — non-commit fragments no longer track, so there's nothing to "re-track". + +**Test "multiple non-commit fragments update last_sent progressively"** (line 199): +Remove this test — non-commit fragments are no-ops. + +**Test "shape only in commit (not in non-commit fragments) is tracked normally"** (line 128): +Simplify: just verify the commit tracks both shapes: + +```elixir + test "commit tracks all shapes including those that had changes only in earlier fragments", + %{tracker: tracker} do + # shape1 only had changes in non-commit fragment (not visible to FlushTracker). + # shape2 has changes in the commit fragment. Both appear in affected_shapes. + tracker = + FlushTracker.handle_txn_fragment( + tracker, + batch(xid: 1, lsn: 5, last_offset: 10), + ["shape1", "shape2"], + MapSet.new(["shape2"]) + ) + + refute FlushTracker.empty?(tracker) + + # Both shapes need to be flushed + tracker = + FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) + + tracker = + FlushTracker.handle_flush_notification(tracker, "shape2", LogOffset.new(5, 10)) + + assert_receive {:flush_confirmed, 5} + assert FlushTracker.empty?(tracker) + end +``` + +- [ ] **Step 5: Run FlushTracker tests** + +Run: `mix test test/electric/replication/shape_log_collector/flush_tracker_test.exs` +Expected: ALL PASS + +- [ ] **Step 6: Format the code** + +``` +mix format +``` + +- [ ] **Step 7: Commit** + +``` +git add packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex \ + packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs +git commit -m "Revert FlushTracker to commit-only tracking + +Non-commit fragments no longer register shapes in FlushTracker. +The Consumer will defer flush notifications until the commit fragment +is processed, so early registration is no longer needed. + +Refs: #4063" +``` + +--- + +### Task 2: Add `pending_flush_offset` to Consumer state + +**Files:** +- Modify: `lib/electric/shapes/consumer/state.ex:19-46` + +- [ ] **Step 1: Add `pending_flush_offset` field to the State struct** + +In `state.ex`, add the field to the struct definition (after `pending_txn: nil`): + +```elixir + # When a {Storage, :flushed, offset} message arrives during a pending + # transaction, we defer the notification and store the max flushed offset + # here. It is processed in maybe_complete_pending_txn after txn_offset_mapping + # is populated. Multiple deferred notifications are collapsed into the max offset. + pending_flush_offset: nil +``` + +- [ ] **Step 2: Commit** + +``` +git add packages/sync-service/lib/electric/shapes/consumer/state.ex +git commit -m "Add pending_flush_offset field to Consumer.State" +``` + +--- + +### Task 3: Defer flush notifications in Consumer during pending transactions + +**Files:** +- Modify: `lib/electric/shapes/consumer.ex:273-278,594-599,696-753` + +This is the core fix. Three changes in `consumer.ex`: + +1. Split the `:flushed` handler to defer during pending transactions +2. Add `process_pending_flush/1` helper +3. Call it from `maybe_complete_pending_txn` and `skip_txn_fragment` + +- [ ] **Step 1: Split the `:flushed` handler into two clauses** + +Replace lines 273-278: + +```elixir + def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do + {state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in) + + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn) + {:noreply, state, state.hibernate_after} + end +``` + +with: + +```elixir + # When a flush arrives during a pending transaction: + # 1. Immediately notify SLC with the highest completed-txn boundary from + # txn_offset_mapping (if any entries are covered by this flush). + # 2. Save the flushed offset for the current pending txn whose + # txn_offset_mapping entry doesn't exist yet. + def handle_info( + {ShapeCache.Storage, :flushed, offset_in}, + %{write_unit: State.write_unit_txn_fragment(), pending_txn: pending_txn} = state + ) + when not is_nil(pending_txn) do + state = notify_flushed_mappings(state, offset_in) + + # Save the flushed offset for the current pending txn. + updated_offset = LogOffset.max(state.pending_flush_offset || offset_in, offset_in) + {:noreply, %{state | pending_flush_offset: updated_offset}, state.hibernate_after} + end + + def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do + {state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in) + + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn) + {:noreply, state, state.hibernate_after} + end +``` + +- [ ] **Step 2: Add `consume_flushed_mappings/2`, `notify_flushed_mappings/2`, and `process_pending_flush/1`** + +Add these private functions near the end of the module (e.g. after `consider_flushed`): + +```elixir + # Walk txn_offset_mapping, dropping entries whose key <= offset_in, + # keeping only the last seen boundary. Stops at the first key > offset_in. + # Returns {nil, list} if nothing matched, {boundary, remaining} otherwise. + defp consume_flushed_mappings([{key, boundary} | rest], offset_in) + when LogOffset.is_log_offset_lte(key, offset_in) do + consume_flushed_mappings(rest, offset_in, boundary) + end + + defp consume_flushed_mappings(remaining, _offset_in), do: {nil, remaining} + + defp consume_flushed_mappings([{key, boundary} | rest], offset_in, _prev_boundary) + when LogOffset.is_log_offset_lte(key, offset_in) do + consume_flushed_mappings(rest, offset_in, boundary) + end + + defp consume_flushed_mappings(remaining, _offset_in, boundary), do: {boundary, remaining} + + # Consume completed entries from txn_offset_mapping and send a single + # flush notification with the highest boundary. FlushTracker keeps one + # {last_sent, last_flushed} entry per shape, so one notification suffices. + defp notify_flushed_mappings(state, offset_in) do + case consume_flushed_mappings(state.txn_offset_mapping, offset_in) do + {nil, _} -> + state + + {boundary, remaining} -> + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, boundary) + %{state | txn_offset_mapping: remaining} + end + end + + # After a pending transaction completes and txn_offset_mapping is populated, + # process the deferred flushed offset (if any). + defp process_pending_flush(%{pending_flush_offset: nil} = state), do: state + + defp process_pending_flush(%{pending_flush_offset: flushed_offset} = state) do + state = %{state | pending_flush_offset: nil} + notify_flushed_mappings(state, flushed_offset) + end +``` + +- [ ] **Step 3: Call `process_pending_flush` in `maybe_complete_pending_txn` (num_changes > 0 branch)** + +In the `txn.num_changes > 0` branch of `maybe_complete_pending_txn` (around line 738), change: + +```elixir + %{ + state + | writer: writer, + pending_txn: nil, + txn_offset_mapping: + state.txn_offset_mapping ++ [{state.latest_offset, txn_fragment.last_log_offset}] + } +``` + +to: + +```elixir + state = %{ + state + | writer: writer, + pending_txn: nil, + txn_offset_mapping: + state.txn_offset_mapping ++ [{state.latest_offset, txn_fragment.last_log_offset}] + } + + process_pending_flush(state) +``` + +- [ ] **Step 4: Run the full consumer test suite** + +Run: `mix test test/electric/shapes/consumer_test.exs` +Expected: ALL PASS (including the existing regression test for #4058 on the branch) + +- [ ] **Step 5: Commit** + +``` +git add packages/sync-service/lib/electric/shapes/consumer.ex \ + packages/sync-service/lib/electric/shapes/consumer/state.ex +git commit -m "Defer flush notifications in Consumer during pending transactions + +When a {Storage, :flushed, offset} message arrives while a multi-fragment +transaction is pending, the Consumer now saves the offset instead of +immediately notifying the ShapeLogCollector. After the commit fragment +populates txn_offset_mapping, the deferred offset is aligned and sent +as a single notification. + +This fixes the race condition where the consumer sent an unaligned +flush offset to FlushTracker because txn_offset_mapping was empty +at the time of the storage flush. + +Refs: #4063" +``` + +--- + +### Task 4: Update the regression test for the new behavior + +**Files:** +- Modify: `test/electric/shapes/consumer_test.exs` (the test added on this branch starting at line 1777) + +The existing regression test for #4058 traces `notify_flushed` calls after the non-commit fragment. With the deferred approach, the consumer should NOT call `notify_flushed` after the non-commit fragment. Instead, it should call it after the commit fragment. + +- [ ] **Step 1: Update the regression test assertions** + +The test currently asserts that `notify_flushed` is called right after the non-commit fragment with `relevant_change_offset`. With the fix, the consumer defers this notification. The assertion should change: after the non-commit fragment, `notify_flushed` should NOT have been called. After the commit fragment, `notify_flushed` should have been called with the **aligned** offset (the commit fragment's `last_log_offset`). + +Replace the test's assertion section (from the `Support.Trace.trace_shape_log_collector_calls` call through the end of the test) with: + +```elixir + Support.Trace.trace_shape_log_collector_calls( + pid: Shapes.Consumer.whereis(stack_id, shape_handle), + functions: [:notify_flushed] + ) + + assert :ok = ShapeLogCollector.handle_event(non_commit_fragment, stack_id) + + # With deferred flush notifications, the consumer does NOT call notify_flushed + # after the non-commit fragment. The :flushed message is saved for later. + assert [] == Support.Trace.collect_traced_calls() + + # Send the commit fragment to finalize the transaction. + assert :ok = ShapeLogCollector.handle_event(commit_fragment, stack_id) + + # Consumer has processed the relevant change... + assert_receive {^ref, :new_changes, ^relevant_change_offset}, @receive_timeout + + # The deferred flush notification is sent after the commit with the + # aligned offset (the commit fragment's last_log_offset). + commit_last_log_offset = commit_fragment.last_log_offset + + assert [ + {ShapeLogCollector, :notify_flushed, + [^stack_id, ^shape_handle, ^commit_last_log_offset]} + ] = Support.Trace.collect_traced_calls() + + # Flush boundary advances correctly. + tx_offset = commit_fragment.last_log_offset.tx_offset + assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout +``` + +- [ ] **Step 2: Also update the #3985 regression test if needed** + +The test at line 1676 ("flush notification for multi-fragment txn is not lost when storage flushes before commit fragment") sends two non-commit fragments with `flush_period: 1` (timer-based flush). With the deferred approach, the `notify_flushed` call traced after fragments 1+2 should now be deferred. Let me check: this test uses `:trace` messages from the tracing module. It matches on `{:trace, _, :call, {ShapeLogCollector, :notify_flushed, ...}}`. + +With deferred flush, the consumer no longer calls `notify_flushed` after the non-commit fragments. The traced call would appear only after the commit fragment. Update the test accordingly: + +Replace the section after `ShapeLogCollector.handle_event(fragment2, stack_id)`: + +```elixir + assert :ok = ShapeLogCollector.handle_event(fragment1, stack_id) + assert :ok = ShapeLogCollector.handle_event(fragment2, stack_id) + + # With deferred flush notifications, notify_flushed is NOT called + # after non-commit fragments. The flush is deferred until the commit. + refute_receive {:trace, _, :call, {ShapeLogCollector, :notify_flushed, _}}, 100 + + # Now send the commit fragment. + commit_fragment = + txn_fragment( + xid, + lsn, + [ + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "99"}, + log_offset: LogOffset.new(lsn, 6) + } + ], + has_commit?: true + ) + + assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + # The deferred flush notification is sent after the commit, aligned + # to the commit fragment's last_log_offset. + commit_offset = commit_fragment.last_log_offset + + assert_receive {:trace, _, :call, + {ShapeLogCollector, :notify_flushed, + [^stack_id, ^shape_handle, ^commit_offset]}}, + @receive_timeout + + # Flush boundary advances. + tx_offset = commit_fragment.last_log_offset.tx_offset + assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout +``` + +Note: the exact shape of this change depends on how the existing test is structured. The key changes are: +1. After non-commit fragments: `refute_receive` for notify_flushed (was `assert_receive`) +2. After commit: `assert_receive` for notify_flushed with the commit's aligned offset +3. The flush boundary assertion stays the same + +- [ ] **Step 3: Run consumer tests** + +Run: `mix test test/electric/shapes/consumer_test.exs` +Expected: ALL PASS + +- [ ] **Step 4: Commit** + +``` +git add packages/sync-service/test/electric/shapes/consumer_test.exs +git commit -m "Update regression tests for deferred flush notification behavior + +Tests now verify that flush notifications are deferred during pending +transactions and sent only after the commit fragment is processed. + +Refs: #4063" +``` + +--- + +### Task 5: Run the full test suite + +- [ ] **Step 1: Run the sync-service test suite** + +Run: `mix test` +Expected: ALL PASS + +- [ ] **Step 2: If any failures, investigate and fix** + +Pay attention to: +- Tests that depend on flush notification timing +- Tests that trace `notify_flushed` calls +- Tests with `write_unit: :txn_fragment` behavior + +--- + +## Design Notes + +### Why revert FlushTracker instead of keeping early tracking? + +The early tracking approach (from PR #3986) tried to solve the problem at the FlushTracker level: register shapes early so flush notifications aren't lost. But this created a new problem: the non-commit fragment's `last_log_offset` could be higher than the consumer's written offset (due to unrelated changes), causing FlushTracker to see a `last_sent` that's higher than any flush notification the consumer would send. + +The deferred approach solves the root cause at the Consumer level: don't send flush notifications until `txn_offset_mapping` is populated and the offset can be correctly aligned to the transaction boundary. + +### What happens when data is only partially flushed? + +If the deferred `flushed_offset < state.latest_offset`, the consumer does NOT send a notification. After the commit, `pending_txn` is nil and `txn_offset_mapping` is populated. The next `{Storage, :flushed, _}` message (from a timer or subsequent write) is handled by the normal (non-deferred) clause, which calls `align_offset_to_txn_boundary` with the correct mapping. + +### What about cross-transaction flushes? + +A `:flushed` message may cover data from previously committed transactions whose entries are already in `txn_offset_mapping`. The deferred handler splits `txn_offset_mapping` at the flushed offset and sends a single `notify_flushed` with the highest completed boundary. This is sufficient because FlushTracker keeps one `{last_sent, last_flushed}` entry per shape — it will either store the boundary as `last_flushed` (if a newer commit already updated `last_sent`) or remove the shape entirely (if `last_sent` matches), in which case the next commit re-adds it as a new entry. + +Only the current pending transaction's portion of the flush is deferred (saved as `pending_flush_offset`), because its `txn_offset_mapping` entry doesn't exist yet. After the commit populates the entry, `process_pending_flush` uses the same split-and-notify pattern. From ac75611ba31c6e858c4d5c2bbb45cdf10d5a3718 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 16:01:54 +0100 Subject: [PATCH 05/10] Revert FlushTracker to commit-only tracking Non-commit fragments no longer register shapes in FlushTracker. The Consumer will defer flush notifications until the commit fragment is processed, so early registration is no longer needed. - FlushTracker.handle_txn_fragment now takes 3 args (removed unused shapes_with_changes parameter) - SLC's dropped-fragment path only calls FlushTracker for commit fragments - Remove misleading FlushTracker tests that claimed to test cross-fragment tracking (that's EventRouter's responsibility, already tested there) - Update handle_txn test helper for arity 3 Refs: #4063 --- .../replication/shape_log_collector.ex | 38 +--- .../shape_log_collector/flush_tracker.ex | 43 +--- .../flush_tracker_test.exs | 207 +----------------- 3 files changed, 23 insertions(+), 265 deletions(-) diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index c4597e22cf..6bbb4b6b87 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -465,17 +465,14 @@ defmodule Electric.Replication.ShapeLogCollector do OpenTelemetry.add_span_attributes("txn.is_dropped": true) - {:ok, - %{ - state - | flush_tracker: - FlushTracker.handle_txn_fragment( - state.flush_tracker, - txn_fragment, - [], - MapSet.new() - ) - }} + flush_tracker = + if txn_fragment.commit do + FlushTracker.handle_txn_fragment(state.flush_tracker, txn_fragment, []) + else + state.flush_tracker + end + + {:ok, %{state | flush_tracker: flush_tracker}} end defp handle_txn_fragment( @@ -578,22 +575,9 @@ defmodule Electric.Replication.ShapeLogCollector do flush_tracker = case event do - %TransactionFragment{} -> - shapes_with_changes = - for {id, frag} <- events_by_handle, - frag.change_count > 0, - not MapSet.member?(undeliverable_set, id), - do: id, - into: MapSet.new() - - if event.commit, do: LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn) - - FlushTracker.handle_txn_fragment( - flush_tracker, - event, - delivered_shapes, - shapes_with_changes - ) + %TransactionFragment{commit: commit} when not is_nil(commit) -> + LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn) + FlushTracker.handle_txn_fragment(flush_tracker, event, delivered_shapes) _ -> flush_tracker diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex b/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex index 8ea1144a38..f377a3f162 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex @@ -95,41 +95,15 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do last_flushed == %{} and :gb_trees.is_empty(tree) end - @spec handle_txn_fragment( - t(), - TransactionFragment.t(), - Enumerable.t(shape_id()), - MapSet.t(shape_id()) - ) :: t() - - # Non-commit fragment: track affected shapes but don't update last_seen_offset - # or notify. This ensures shapes are registered early so flush notifications - # from Consumers aren't lost when storage flushes before the commit arrives. - def handle_txn_fragment( - %__MODULE__{} = state, - %TransactionFragment{commit: nil, last_log_offset: last_log_offset}, - affected_shapes, - _shapes_with_changes - ) do - track_shapes(state, last_log_offset, affected_shapes) - end + @spec handle_txn_fragment(t(), TransactionFragment.t(), Enumerable.t(shape_id())) :: t() - # Commit fragment: track shapes that have actual changes in this fragment - # or are already being tracked (need last_sent updated to commit offset). - # Skip shapes that only have a commit marker and already flushed from - # earlier non-commit fragments — there's nothing new to flush for them. + # Commit fragment: track all shapes affected by all fragments of the transaction and update last_seen_offset. def handle_txn_fragment( %__MODULE__{} = state, %TransactionFragment{commit: %Commit{}, last_log_offset: last_log_offset}, - affected_shapes, - shapes_with_changes + affected_shapes ) do - shapes_to_track = - Enum.filter(affected_shapes, fn shape -> - shape in shapes_with_changes or is_map_key(state.last_flushed, shape) - end) - - state = track_shapes(state, last_log_offset, shapes_to_track) + state = track_shapes(state, last_log_offset, affected_shapes) state = %{state | last_seen_offset: last_log_offset} @@ -211,14 +185,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do min_incomplete_flush_tree: min_incomplete_flush_tree } - # Only update global offset if we've seen at least one commit. - # Before any commit, last_seen_offset is before_all and there's - # nothing meaningful to report. - if state.last_seen_offset == LogOffset.before_all() do - state - else - update_global_offset(state) - end + update_global_offset(state) end # If the shape is not in the mapping, then we're processing a flush notification for a shape that was removed diff --git a/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs index 6fabe24447..5b2f45655e 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs @@ -27,23 +27,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do refute_receive {:flush_confirmed, _} end - test "non-commit fragment tracks shapes but does not notify or update last_seen", %{ - tracker: tracker - } do - fragment = %TransactionFragment{ - xid: 1, - lsn: 1, - last_log_offset: LogOffset.new(1, 0), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - refute_receive {:flush_confirmed, _} - # Shape is tracked in last_flushed - refute FlushTracker.empty?(tracker) - end - - test "non-commit fragment with no affected shapes is a no-op", %{tracker: tracker} do + test "non-commit fragment raises FunctionClauseError", %{tracker: tracker} do fragment = %TransactionFragment{ xid: 1, lsn: 1, @@ -51,192 +35,20 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do commit: nil } - tracker = handle_txn(tracker, fragment, []) - refute_receive {:flush_confirmed, _} - assert FlushTracker.empty?(tracker) - end - - test "shape tracked by non-commit fragment can be flushed before commit arrives", %{ - tracker: tracker - } do - # Non-commit fragment registers shape - fragment = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 4), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Flush notification catches up the shape in last_flushed - tracker = FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 4)) - - # No notification yet — no commit seen - refute_receive {:flush_confirmed, _} - assert FlushTracker.empty?(tracker) - - # Commit arrives with shape1 in affected_shapes (via EventRouter's shapes_in_txn) - # but shape1 has no new changes — only a commit marker. It was already flushed, - # so it should not be re-registered. - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - # Shape was skipped, tracker is empty, global offset notified - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end - - test "shape tracked by non-commit and still pending is updated by commit", %{ - tracker: tracker - } do - # Non-commit fragment registers shape - fragment = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 2), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Commit arrives — shape is still in last_flushed, so last_sent is updated - # (shapes_with_changes doesn't matter here since shape is already tracked) - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - refute FlushTracker.empty?(tracker) - - # Flush at the commit's offset catches up the shape - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end - - test "shape only in commit (not in non-commit fragments) is tracked normally", %{ - tracker: tracker - } do - # Non-commit fragment for shape1 - fragment = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 2), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Commit has both shapes — shape2 has actual changes in the commit fragment - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1", "shape2"], - MapSet.new(["shape2"]) - ) - - refute FlushTracker.empty?(tracker) - - # Both shapes need to be flushed - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - tracker = - FlushTracker.handle_flush_notification(tracker, "shape2", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) + assert_raise FunctionClauseError, fn -> handle_txn(tracker, fragment, ["shape1"]) end end - test "already-flushed shape with new changes in commit is re-tracked", %{ + test "non-commit fragment with no affected shapes raises FunctionClauseError", %{ tracker: tracker } do - # Non-commit fragment registers shape fragment = %TransactionFragment{ xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 4), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Flush notification catches up the shape in last_flushed - tracker = FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 4)) - assert FlushTracker.empty?(tracker) - - # Commit arrives — shape1 has NEW changes in the commit fragment - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new(["shape1"]) - ) - - # Shape must be re-tracked to ensure commit-fragment writes are flushed - refute FlushTracker.empty?(tracker) - - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end - - test "multiple non-commit fragments update last_sent progressively", %{ - tracker: tracker - } do - frag1 = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 2), - commit: nil - } - - frag2 = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 5), + lsn: 1, + last_log_offset: LogOffset.new(1, 0), commit: nil } - tracker = - tracker - |> handle_txn(frag1, ["shape1"]) - |> handle_txn(frag2, ["shape1"]) - - # Flushing to the latest non-commit offset catches up the shape - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 5)) - - # No notification — no commit seen - refute_receive {:flush_confirmed, _} - assert FlushTracker.empty?(tracker) - - # Commit with no new changes — shape was flushed, skipped - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) + assert_raise FunctionClauseError, fn -> handle_txn(tracker, fragment, []) end end end @@ -460,12 +272,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do # Helper: calls handle_txn_fragment with shapes_with_changes defaulting to # all affected shapes (the common case for single-fragment transactions). defp handle_txn(tracker, fragment, affected_shapes) do - FlushTracker.handle_txn_fragment( - tracker, - fragment, - affected_shapes, - MapSet.new(affected_shapes) - ) + FlushTracker.handle_txn_fragment(tracker, fragment, affected_shapes) end defp batch(opts) do From c8bf32ee3e95337152a97331fa37e212c0bbb3c2 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 16:04:32 +0100 Subject: [PATCH 06/10] Defer flush notifications in Consumer during pending transactions When a {Storage, :flushed, offset} message arrives while a multi-fragment transaction is pending, the Consumer now saves the offset instead of immediately notifying the ShapeLogCollector. After the commit fragment populates txn_offset_mapping, the deferred offset is aligned and sent as a single notification. This fixes the race condition where the consumer sent an unaligned flush offset to FlushTracker because txn_offset_mapping was empty at the time of the storage flush. --- .../lib/electric/shapes/consumer.ex | 49 ++++++++++++++++--- .../lib/electric/shapes/consumer/state.ex | 10 ++-- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index abe734a678..2917bd57c0 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -270,10 +270,20 @@ defmodule Electric.Shapes.Consumer do end end - def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do - {state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in) + def handle_info({ShapeCache.Storage, :flushed, flushed_offset}, state) do + state = + if is_write_unit_txn(state.write_unit) or is_nil(state.pending_txn) do + # We're not currently in the middle of processing a transaction. This flushed offset is either + # from a previously processed transaction or a non-commit fragment of the most recently + # seen transaction. Notify ShapeLogCollector about it immediately. + confirm_flushed_and_notify(state, flushed_offset) + else + # Storage has signaled latest flushed offset in the middle of processing a multi-fragment + # transaction. Save it for later, to be handled when the commit fragment arrives. + updated_offset = more_recent_offset(state.pending_flush_offset, flushed_offset) + %{state | pending_flush_offset: updated_offset} + end - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn) {:noreply, state, state.hibernate_after} end @@ -567,7 +577,7 @@ defmodule Electric.Shapes.Consumer do # With write_unit=txn all fragments are buffered until the Commit change is seen. At that # point, a transaction struct is produced from the buffered fragments and is written to # storage. - state.write_unit == State.write_unit_txn() -> + is_write_unit_txn(state.write_unit) -> {txns, transaction_builder} = TransactionBuilder.build(txn_fragment, state.transaction_builder) @@ -597,6 +607,7 @@ defmodule Electric.Shapes.Consumer do defp skip_txn_fragment(state, %TransactionFragment{} = txn_fragment) do %{state | pending_txn: nil} |> consider_flushed(txn_fragment.last_log_offset) + |> clear_pending_flush_offset() end # This function does similar things to do_handle_txn/2 but with the following simplifications: @@ -747,9 +758,10 @@ defmodule Electric.Shapes.Consumer do "No relevant changes written in transaction xid=#{txn.xid}" end) - state = %{state | pending_txn: nil} - consider_flushed(state, txn_fragment.last_log_offset) + %{state | pending_txn: nil} + |> consider_flushed(txn_fragment.last_log_offset) end + |> clear_pending_flush_offset() end def process_buffered_txn_fragments(%State{buffer: buffer} = state) do @@ -1006,6 +1018,31 @@ defmodule Electric.Shapes.Consumer do end end + defp confirm_flushed_and_notify(state, flushed_offset) do + {state, txn_offset} = State.align_offset_to_txn_boundary(state, flushed_offset) + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, txn_offset) + state + end + + # After a pending transaction completes and txn_offset_mapping is populated, + # process the deferred flushed offset (if any). + # + # Even if the most recent transaction is skipped or no changes from it end up satisfying the + # shape's `where` condition, Storage may have signaled a flush offset from the previous transaction + # while we were still processing fragments of the current one. Therefore this function must + # be called any time `state.pending_txn` is reset to nil in a multi-fragment transaction + # processing setting. + defp clear_pending_flush_offset(%{pending_flush_offset: nil} = state), do: state + + defp clear_pending_flush_offset(%{pending_flush_offset: flushed_offset} = state) do + %{state | pending_flush_offset: nil} + |> confirm_flushed_and_notify(flushed_offset) + end + + defp more_recent_offset(nil, offset), do: offset + defp more_recent_offset(offset, nil), do: offset + defp more_recent_offset(offset1, offset2), do: LogOffset.max(offset1, offset2) + defp subscribe(state, action) do case ShapeLogCollector.add_shape(state.stack_id, state.shape_handle, state.shape, action) do :ok -> diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index b32204d2a9..b2402ae742 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -42,7 +42,11 @@ defmodule Electric.Shapes.Consumer.State do # Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen. # It is used to check whether the entire txn is visible in the snapshot and to mark it # as flushed in order to handle its remaining fragments appropriately. - pending_txn: nil + pending_txn: nil, + # When a {Storage, :flushed, offset} message arrives during a pending + # transaction, we defer the notification and store the max flushed offset + # here. ultiple deferred notifications are collapsed into a single most recent offset. + pending_flush_offset: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -393,6 +397,6 @@ defmodule Electric.Shapes.Consumer.State do ] end - def write_unit_txn, do: @write_unit_txn - def write_unit_txn_fragment, do: @write_unit_txn_fragment + defguard is_write_unit_txn(write_unit) when write_unit == @write_unit_txn + defguard is_write_unit_txn_fragment(write_unit) when write_unit == @write_unit_txn_fragment end From 7b56ab082560a9a0ec47a5f2b0bcf19144435302 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 16:08:37 +0100 Subject: [PATCH 07/10] Update regression tests for deferred flush notification behavior Tests now verify that flush notifications are deferred during pending transactions and sent only after the commit fragment is processed. Also fix a compilation error in Consumer (cannot call remote function in pattern match) and add a non-commit fragment clause to FlushTracker's handle_txn_fragment/4. Refs: #4063 --- .../test/electric/shapes/consumer_test.exs | 53 +++++++++++-------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index bc0b899ad7..a0abc0b37e 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -1677,10 +1677,11 @@ defmodule Electric.Shapes.ConsumerTest do test "flush notification for multi-fragment txn is not lost when storage flushes before commit fragment", %{stack_id: stack_id} = ctx do # Regression test for https://github.com/electric-sql/electric/issues/3985 + # Updated for deferred flush notification fix (#4063). # # When a multi-fragment transaction's non-commit fragments are flushed to disk # before the commit fragment is processed by ShapeLogCollector, the flush - # notification was lost because FlushTracker hadn't registered the shape yet. + # notification was lost because FlushTracker wasn't tracking the shape's offsets. # This caused the shape to be stuck in the FlushTracker, blocking # the global flush offset from advancing. {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, stack_id) @@ -1738,19 +1739,12 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(fragment1, stack_id) assert :ok = ShapeLogCollector.handle_event(fragment2, stack_id) - flushed_log_offset = fragment2.last_log_offset - - # Matching on a traced call inline to avoid any timing issues that - # Trace.collect_traced_calls() is susceptible to in this case. - assert_receive {:trace, _, :call, - {ShapeLogCollector, :notify_flushed, - [^stack_id, ^shape_handle, ^flushed_log_offset]}}, - @receive_timeout + # With deferred flush notifications, notify_flushed is NOT called + # after non-commit fragments. The flush is deferred until the commit. + assert [] == Support.Trace.collect_traced_calls() # Now send the commit fragment. The commit fragment itself has NO matching # changes for the shape — all changes were in earlier fragments. - # After this, FlushTracker registers the shape but the data was already - # flushed, so no new :flushed message will arrive. commit_fragment = txn_fragment( xid, @@ -1768,8 +1762,16 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) assert_receive {^ref, :new_changes, _}, @receive_timeout - # Assert that the flush boundary has advanced which wasn't the case before due to the - # aforementioned bug, + # The deferred flush notification is sent after the commit, aligned + # to the commit fragment's last_log_offset. + commit_offset = commit_fragment.last_log_offset + + assert_receive {:trace, _, :call, + {ShapeLogCollector, :notify_flushed, + [^stack_id, ^shape_handle, ^commit_offset]}}, + @receive_timeout + + # Flush boundary advances. tx_offset = commit_fragment.last_log_offset.tx_offset assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout end @@ -1777,7 +1779,7 @@ defmodule Electric.Shapes.ConsumerTest do @tag allow_subqueries: false, with_pure_file_storage_opts: [flush_period: 10_000] test "flush notification offset is aligned when storage flushes before commit arrives at consumer", %{stack_id: stack_id} do - # Regression test for https://github.com/electric-sql/electric/issues/4058 + # Regression test for https://github.com/electric-sql/electric/issues/4063 # # When a non-commit fragment has enough data to trigger a buffer-size # flush (>= 64KB), the :flushed message is placed in the consumer's @@ -1854,21 +1856,26 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(non_commit_fragment, stack_id) - assert [ - {ShapeLogCollector, :notify_flushed, - [stack_id, shape_handle, relevant_change_offset]} - ] == Support.Trace.collect_traced_calls() + # With deferred flush notifications, the consumer does NOT call notify_flushed + # after the non-commit fragment. The :flushed message is saved for later. + assert [] == Support.Trace.collect_traced_calls() - # Send the commit fragment to finalize the transaction in shape consumer. - # FlushTracker still tracks the transaction but the consumer won't send another flush - # notification, so the shape remains stuck and blocks all further advancement of flushed - # offset. + # Send the commit fragment to finalize the transaction. assert :ok = ShapeLogCollector.handle_event(commit_fragment, stack_id) # Consumer has processed the relevant change... assert_receive {^ref, :new_changes, ^relevant_change_offset}, @receive_timeout - # ...but the replication client never gets a flush boundary advancement + # The deferred flush notification is sent after the commit with the + # aligned offset (the commit fragment's last_log_offset). + commit_last_log_offset = commit_fragment.last_log_offset + + assert [ + {ShapeLogCollector, :notify_flushed, + [^stack_id, ^shape_handle, ^commit_last_log_offset]} + ] = Support.Trace.collect_traced_calls() + + # Flush boundary advances correctly. tx_offset = commit_fragment.last_log_offset.tx_offset assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout end From e4ca03704f8ad00f1d91b28f4678d5583b298174 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 22:25:49 +0100 Subject: [PATCH 08/10] Remove implementation plan Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plans/2026-03-26-issue-4063/plan.md | 542 ------------------ 1 file changed, 542 deletions(-) delete mode 100644 packages/sync-service/plans/2026-03-26-issue-4063/plan.md diff --git a/packages/sync-service/plans/2026-03-26-issue-4063/plan.md b/packages/sync-service/plans/2026-03-26-issue-4063/plan.md deleted file mode 100644 index d08b27a602..0000000000 --- a/packages/sync-service/plans/2026-03-26-issue-4063/plan.md +++ /dev/null @@ -1,542 +0,0 @@ -# Deferred Flush Notification for Multi-Fragment Transactions - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Fix the race condition where a consumer sends an unaligned flush offset to FlushTracker when storage flushes data from a non-commit fragment before the commit fragment populates `txn_offset_mapping`. - -**Architecture:** Two coordinated changes: (1) Revert FlushTracker to only track shapes from commit fragments (undoing the early-tracking approach from PR #3986). (2) In the Consumer, defer `{Storage, :flushed, offset}` notifications that arrive during a pending transaction, then process the deferred notification after the commit fragment has been handled and `txn_offset_mapping` is populated. - -**Tech Stack:** Elixir, GenServer, ExUnit - -**Ref:** https://github.com/electric-sql/electric/issues/4063 - ---- - -## File Map - -| File | Action | Responsibility | -|------|--------|----------------| -| `lib/electric/replication/shape_log_collector/flush_tracker.ex` | Modify | Revert non-commit fragment tracking to no-op; commit tracks ALL delivered shapes | -| `lib/electric/shapes/consumer/state.ex` | Modify | Add `pending_flush_offset` field to struct | -| `lib/electric/shapes/consumer.ex` | Modify | Defer `:flushed` messages during pending txn; process deferred in `maybe_complete_pending_txn` and `skip_txn_fragment` | -| `test/electric/replication/shape_log_collector/flush_tracker_test.exs` | Modify | Update tests: non-commit fragment is now a no-op, commit tracks all affected shapes | -| `test/electric/shapes/consumer_test.exs` | Modify | Update regression test to verify deferred flush behavior | - ---- - -### Task 1: Revert FlushTracker non-commit fragment tracking - -**Files:** -- Modify: `lib/electric/replication/shape_log_collector/flush_tracker.ex:105-141` -- Modify: `test/electric/replication/shape_log_collector/flush_tracker_test.exs` - -The non-commit clause currently calls `track_shapes()`. It needs to be removed so that only the clause that matches on %Commit{} remains. The commit clause currently filters `affected_shapes` to only those in `shapes_with_changes` or already tracked. Since non-commit fragments no longer track anything, `is_map_key(state.last_flushed, shape)` is always false for shapes not in the commit. Revert to tracking ALL `affected_shapes` delivered for the commit. - -- [ ] **Step 1: Remove FlushTracker non-commit clause** - -In `flush_tracker.ex`, replace lines 105-115: - -```diff -- # Non-commit fragment: track affected shapes but don't update last_seen_offset -- # or notify. This ensures shapes are registered early so flush notifications -- # from Consumers aren't lost when storage flushes before the commit arrives. -- def handle_txn_fragment( -- %__MODULE__{} = state, -- %TransactionFragment{commit: nil, last_log_offset: last_log_offset}, -- affected_shapes, -- _shapes_with_changes -- ) do -- track_shapes(state, last_log_offset, affected_shapes) -- end -``` - -- [ ] **Step 2: Update FlushTracker commit clause to track all affected shapes** - -In `flush_tracker.ex`, replace lines 117-141: - -```elixir - # Commit fragment: track all shapes affected by all fragments of the transaction and update last_seen_offset. - def handle_txn_fragment( - %__MODULE__{} = state, - %TransactionFragment{commit: %Commit{}, last_log_offset: last_log_offset}, - affected_shapes, - _shapes_with_changes - ) do - state = track_shapes(state, last_log_offset, affected_shapes) - - state = %{state | last_seen_offset: last_log_offset} - - if state.last_flushed == %{} do - update_global_offset(state) - else - state - end - end -``` - -- [ ] **Step 3: Remove `last_seen_offset == before_all` guard from `handle_flush_notification`** - -In `flush_tracker.ex`, lines 214-221, replace: - -```elixir - # Only update global offset if we've seen at least one commit. - # Before any commit, last_seen_offset is before_all and there's - # nothing meaningful to report. - if state.last_seen_offset == LogOffset.before_all() do - state - else - update_global_offset(state) - end -``` - -with just: - -```elixir - update_global_offset(state) -``` - -This guard was added in PR #3986 because non-commit fragments could trigger flush notifications before any commit. With the deferred approach in Consumer, flush notifications only reach FlushTracker after a commit has been processed. - -- [ ] **Step 4: Update FlushTracker tests** - -Several tests exercise non-commit fragment tracking that no longer applies. Update: - -**Test "non-commit fragment tracks shapes but does not notify or update last_seen"** (line 30): -Non-commit fragments are now raising FunctionClauseError. Add an assert_raise to the test - -```elixir - test "non-commit fragment is a no-op", %{tracker: tracker} do - fragment = %TransactionFragment{ - xid: 1, - lsn: 1, - last_log_offset: LogOffset.new(1, 0), - commit: nil - } - - assert_raise FunctionClauseError, fn -> handle_txn(tracker, fragment, ["shape1"]) end - end -``` - -**Test "shape tracked by non-commit fragment can be flushed before commit arrives"** (line 59): -This test's premise no longer holds (non-commit doesn't track). Replace with a test that verifies the commit tracks all affected shapes regardless of whether they had changes in the commit fragment: - -```elixir - test "commit tracks all affected shapes even those without changes in the commit fragment", - %{tracker: tracker} do - # shape1 had changes in a non-commit fragment (not tracked by FlushTracker). - # The commit fragment lists it in affected_shapes (via shapes_in_txn). - # shapes_with_changes is empty (no data changes in commit). - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - refute FlushTracker.empty?(tracker) - - # Flush at the commit offset catches up - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end -``` - -**Test "shape tracked by non-commit and still pending is updated by commit"** (line 95): -Remove this test — its premise (non-commit tracking + commit updating last_sent) no longer applies. - -**Test "already-flushed shape with new changes in commit is re-tracked"** (line 163): -Remove this test — non-commit fragments no longer track, so there's nothing to "re-track". - -**Test "multiple non-commit fragments update last_sent progressively"** (line 199): -Remove this test — non-commit fragments are no-ops. - -**Test "shape only in commit (not in non-commit fragments) is tracked normally"** (line 128): -Simplify: just verify the commit tracks both shapes: - -```elixir - test "commit tracks all shapes including those that had changes only in earlier fragments", - %{tracker: tracker} do - # shape1 only had changes in non-commit fragment (not visible to FlushTracker). - # shape2 has changes in the commit fragment. Both appear in affected_shapes. - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1", "shape2"], - MapSet.new(["shape2"]) - ) - - refute FlushTracker.empty?(tracker) - - # Both shapes need to be flushed - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - tracker = - FlushTracker.handle_flush_notification(tracker, "shape2", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end -``` - -- [ ] **Step 5: Run FlushTracker tests** - -Run: `mix test test/electric/replication/shape_log_collector/flush_tracker_test.exs` -Expected: ALL PASS - -- [ ] **Step 6: Format the code** - -``` -mix format -``` - -- [ ] **Step 7: Commit** - -``` -git add packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex \ - packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs -git commit -m "Revert FlushTracker to commit-only tracking - -Non-commit fragments no longer register shapes in FlushTracker. -The Consumer will defer flush notifications until the commit fragment -is processed, so early registration is no longer needed. - -Refs: #4063" -``` - ---- - -### Task 2: Add `pending_flush_offset` to Consumer state - -**Files:** -- Modify: `lib/electric/shapes/consumer/state.ex:19-46` - -- [ ] **Step 1: Add `pending_flush_offset` field to the State struct** - -In `state.ex`, add the field to the struct definition (after `pending_txn: nil`): - -```elixir - # When a {Storage, :flushed, offset} message arrives during a pending - # transaction, we defer the notification and store the max flushed offset - # here. It is processed in maybe_complete_pending_txn after txn_offset_mapping - # is populated. Multiple deferred notifications are collapsed into the max offset. - pending_flush_offset: nil -``` - -- [ ] **Step 2: Commit** - -``` -git add packages/sync-service/lib/electric/shapes/consumer/state.ex -git commit -m "Add pending_flush_offset field to Consumer.State" -``` - ---- - -### Task 3: Defer flush notifications in Consumer during pending transactions - -**Files:** -- Modify: `lib/electric/shapes/consumer.ex:273-278,594-599,696-753` - -This is the core fix. Three changes in `consumer.ex`: - -1. Split the `:flushed` handler to defer during pending transactions -2. Add `process_pending_flush/1` helper -3. Call it from `maybe_complete_pending_txn` and `skip_txn_fragment` - -- [ ] **Step 1: Split the `:flushed` handler into two clauses** - -Replace lines 273-278: - -```elixir - def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do - {state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in) - - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn) - {:noreply, state, state.hibernate_after} - end -``` - -with: - -```elixir - # When a flush arrives during a pending transaction: - # 1. Immediately notify SLC with the highest completed-txn boundary from - # txn_offset_mapping (if any entries are covered by this flush). - # 2. Save the flushed offset for the current pending txn whose - # txn_offset_mapping entry doesn't exist yet. - def handle_info( - {ShapeCache.Storage, :flushed, offset_in}, - %{write_unit: State.write_unit_txn_fragment(), pending_txn: pending_txn} = state - ) - when not is_nil(pending_txn) do - state = notify_flushed_mappings(state, offset_in) - - # Save the flushed offset for the current pending txn. - updated_offset = LogOffset.max(state.pending_flush_offset || offset_in, offset_in) - {:noreply, %{state | pending_flush_offset: updated_offset}, state.hibernate_after} - end - - def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do - {state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in) - - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn) - {:noreply, state, state.hibernate_after} - end -``` - -- [ ] **Step 2: Add `consume_flushed_mappings/2`, `notify_flushed_mappings/2`, and `process_pending_flush/1`** - -Add these private functions near the end of the module (e.g. after `consider_flushed`): - -```elixir - # Walk txn_offset_mapping, dropping entries whose key <= offset_in, - # keeping only the last seen boundary. Stops at the first key > offset_in. - # Returns {nil, list} if nothing matched, {boundary, remaining} otherwise. - defp consume_flushed_mappings([{key, boundary} | rest], offset_in) - when LogOffset.is_log_offset_lte(key, offset_in) do - consume_flushed_mappings(rest, offset_in, boundary) - end - - defp consume_flushed_mappings(remaining, _offset_in), do: {nil, remaining} - - defp consume_flushed_mappings([{key, boundary} | rest], offset_in, _prev_boundary) - when LogOffset.is_log_offset_lte(key, offset_in) do - consume_flushed_mappings(rest, offset_in, boundary) - end - - defp consume_flushed_mappings(remaining, _offset_in, boundary), do: {boundary, remaining} - - # Consume completed entries from txn_offset_mapping and send a single - # flush notification with the highest boundary. FlushTracker keeps one - # {last_sent, last_flushed} entry per shape, so one notification suffices. - defp notify_flushed_mappings(state, offset_in) do - case consume_flushed_mappings(state.txn_offset_mapping, offset_in) do - {nil, _} -> - state - - {boundary, remaining} -> - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, boundary) - %{state | txn_offset_mapping: remaining} - end - end - - # After a pending transaction completes and txn_offset_mapping is populated, - # process the deferred flushed offset (if any). - defp process_pending_flush(%{pending_flush_offset: nil} = state), do: state - - defp process_pending_flush(%{pending_flush_offset: flushed_offset} = state) do - state = %{state | pending_flush_offset: nil} - notify_flushed_mappings(state, flushed_offset) - end -``` - -- [ ] **Step 3: Call `process_pending_flush` in `maybe_complete_pending_txn` (num_changes > 0 branch)** - -In the `txn.num_changes > 0` branch of `maybe_complete_pending_txn` (around line 738), change: - -```elixir - %{ - state - | writer: writer, - pending_txn: nil, - txn_offset_mapping: - state.txn_offset_mapping ++ [{state.latest_offset, txn_fragment.last_log_offset}] - } -``` - -to: - -```elixir - state = %{ - state - | writer: writer, - pending_txn: nil, - txn_offset_mapping: - state.txn_offset_mapping ++ [{state.latest_offset, txn_fragment.last_log_offset}] - } - - process_pending_flush(state) -``` - -- [ ] **Step 4: Run the full consumer test suite** - -Run: `mix test test/electric/shapes/consumer_test.exs` -Expected: ALL PASS (including the existing regression test for #4058 on the branch) - -- [ ] **Step 5: Commit** - -``` -git add packages/sync-service/lib/electric/shapes/consumer.ex \ - packages/sync-service/lib/electric/shapes/consumer/state.ex -git commit -m "Defer flush notifications in Consumer during pending transactions - -When a {Storage, :flushed, offset} message arrives while a multi-fragment -transaction is pending, the Consumer now saves the offset instead of -immediately notifying the ShapeLogCollector. After the commit fragment -populates txn_offset_mapping, the deferred offset is aligned and sent -as a single notification. - -This fixes the race condition where the consumer sent an unaligned -flush offset to FlushTracker because txn_offset_mapping was empty -at the time of the storage flush. - -Refs: #4063" -``` - ---- - -### Task 4: Update the regression test for the new behavior - -**Files:** -- Modify: `test/electric/shapes/consumer_test.exs` (the test added on this branch starting at line 1777) - -The existing regression test for #4058 traces `notify_flushed` calls after the non-commit fragment. With the deferred approach, the consumer should NOT call `notify_flushed` after the non-commit fragment. Instead, it should call it after the commit fragment. - -- [ ] **Step 1: Update the regression test assertions** - -The test currently asserts that `notify_flushed` is called right after the non-commit fragment with `relevant_change_offset`. With the fix, the consumer defers this notification. The assertion should change: after the non-commit fragment, `notify_flushed` should NOT have been called. After the commit fragment, `notify_flushed` should have been called with the **aligned** offset (the commit fragment's `last_log_offset`). - -Replace the test's assertion section (from the `Support.Trace.trace_shape_log_collector_calls` call through the end of the test) with: - -```elixir - Support.Trace.trace_shape_log_collector_calls( - pid: Shapes.Consumer.whereis(stack_id, shape_handle), - functions: [:notify_flushed] - ) - - assert :ok = ShapeLogCollector.handle_event(non_commit_fragment, stack_id) - - # With deferred flush notifications, the consumer does NOT call notify_flushed - # after the non-commit fragment. The :flushed message is saved for later. - assert [] == Support.Trace.collect_traced_calls() - - # Send the commit fragment to finalize the transaction. - assert :ok = ShapeLogCollector.handle_event(commit_fragment, stack_id) - - # Consumer has processed the relevant change... - assert_receive {^ref, :new_changes, ^relevant_change_offset}, @receive_timeout - - # The deferred flush notification is sent after the commit with the - # aligned offset (the commit fragment's last_log_offset). - commit_last_log_offset = commit_fragment.last_log_offset - - assert [ - {ShapeLogCollector, :notify_flushed, - [^stack_id, ^shape_handle, ^commit_last_log_offset]} - ] = Support.Trace.collect_traced_calls() - - # Flush boundary advances correctly. - tx_offset = commit_fragment.last_log_offset.tx_offset - assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout -``` - -- [ ] **Step 2: Also update the #3985 regression test if needed** - -The test at line 1676 ("flush notification for multi-fragment txn is not lost when storage flushes before commit fragment") sends two non-commit fragments with `flush_period: 1` (timer-based flush). With the deferred approach, the `notify_flushed` call traced after fragments 1+2 should now be deferred. Let me check: this test uses `:trace` messages from the tracing module. It matches on `{:trace, _, :call, {ShapeLogCollector, :notify_flushed, ...}}`. - -With deferred flush, the consumer no longer calls `notify_flushed` after the non-commit fragments. The traced call would appear only after the commit fragment. Update the test accordingly: - -Replace the section after `ShapeLogCollector.handle_event(fragment2, stack_id)`: - -```elixir - assert :ok = ShapeLogCollector.handle_event(fragment1, stack_id) - assert :ok = ShapeLogCollector.handle_event(fragment2, stack_id) - - # With deferred flush notifications, notify_flushed is NOT called - # after non-commit fragments. The flush is deferred until the commit. - refute_receive {:trace, _, :call, {ShapeLogCollector, :notify_flushed, _}}, 100 - - # Now send the commit fragment. - commit_fragment = - txn_fragment( - xid, - lsn, - [ - %Changes.NewRecord{ - relation: {"public", "other_table"}, - record: %{"id" => "99"}, - log_offset: LogOffset.new(lsn, 6) - } - ], - has_commit?: true - ) - - assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) - assert_receive {^ref, :new_changes, _}, @receive_timeout - - # The deferred flush notification is sent after the commit, aligned - # to the commit fragment's last_log_offset. - commit_offset = commit_fragment.last_log_offset - - assert_receive {:trace, _, :call, - {ShapeLogCollector, :notify_flushed, - [^stack_id, ^shape_handle, ^commit_offset]}}, - @receive_timeout - - # Flush boundary advances. - tx_offset = commit_fragment.last_log_offset.tx_offset - assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout -``` - -Note: the exact shape of this change depends on how the existing test is structured. The key changes are: -1. After non-commit fragments: `refute_receive` for notify_flushed (was `assert_receive`) -2. After commit: `assert_receive` for notify_flushed with the commit's aligned offset -3. The flush boundary assertion stays the same - -- [ ] **Step 3: Run consumer tests** - -Run: `mix test test/electric/shapes/consumer_test.exs` -Expected: ALL PASS - -- [ ] **Step 4: Commit** - -``` -git add packages/sync-service/test/electric/shapes/consumer_test.exs -git commit -m "Update regression tests for deferred flush notification behavior - -Tests now verify that flush notifications are deferred during pending -transactions and sent only after the commit fragment is processed. - -Refs: #4063" -``` - ---- - -### Task 5: Run the full test suite - -- [ ] **Step 1: Run the sync-service test suite** - -Run: `mix test` -Expected: ALL PASS - -- [ ] **Step 2: If any failures, investigate and fix** - -Pay attention to: -- Tests that depend on flush notification timing -- Tests that trace `notify_flushed` calls -- Tests with `write_unit: :txn_fragment` behavior - ---- - -## Design Notes - -### Why revert FlushTracker instead of keeping early tracking? - -The early tracking approach (from PR #3986) tried to solve the problem at the FlushTracker level: register shapes early so flush notifications aren't lost. But this created a new problem: the non-commit fragment's `last_log_offset` could be higher than the consumer's written offset (due to unrelated changes), causing FlushTracker to see a `last_sent` that's higher than any flush notification the consumer would send. - -The deferred approach solves the root cause at the Consumer level: don't send flush notifications until `txn_offset_mapping` is populated and the offset can be correctly aligned to the transaction boundary. - -### What happens when data is only partially flushed? - -If the deferred `flushed_offset < state.latest_offset`, the consumer does NOT send a notification. After the commit, `pending_txn` is nil and `txn_offset_mapping` is populated. The next `{Storage, :flushed, _}` message (from a timer or subsequent write) is handled by the normal (non-deferred) clause, which calls `align_offset_to_txn_boundary` with the correct mapping. - -### What about cross-transaction flushes? - -A `:flushed` message may cover data from previously committed transactions whose entries are already in `txn_offset_mapping`. The deferred handler splits `txn_offset_mapping` at the flushed offset and sends a single `notify_flushed` with the highest completed boundary. This is sufficient because FlushTracker keeps one `{last_sent, last_flushed}` entry per shape — it will either store the boundary as `last_flushed` (if a newer commit already updated `last_sent`) or remove the shape entirely (if `last_sent` matches), in which case the next commit re-adds it as a new entry. - -Only the current pending transaction's portion of the flush is deferred (saved as `pending_flush_offset`), because its `txn_offset_mapping` entry doesn't exist yet. After the commit populates the entry, `process_pending_flush` uses the same split-and-notify pattern. From 7cd24ee190ceacc9a21464ca32ec33de6b374cde Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 22:25:51 +0100 Subject: [PATCH 09/10] Add changeset Co-Authored-By: Claude Opus 4.6 (1M context) --- .changeset/brave-doors-kneel.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/brave-doors-kneel.md diff --git a/.changeset/brave-doors-kneel.md b/.changeset/brave-doors-kneel.md new file mode 100644 index 0000000000..d47fb70c0b --- /dev/null +++ b/.changeset/brave-doors-kneel.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Fix stuck flush tracker when storage flush notification arrives mid-transaction in Consumer From b76d0d31d730baf26221d3b5fc4e81d547d368ef Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 26 Mar 2026 22:45:10 +0100 Subject: [PATCH 10/10] Fix typo --- packages/sync-service/lib/electric/shapes/consumer/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index b2402ae742..52c70eaff9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -45,7 +45,7 @@ defmodule Electric.Shapes.Consumer.State do pending_txn: nil, # When a {Storage, :flushed, offset} message arrives during a pending # transaction, we defer the notification and store the max flushed offset - # here. ultiple deferred notifications are collapsed into a single most recent offset. + # here. Multiple deferred notifications are collapsed into a single most recent offset. pending_flush_offset: nil ]