diff --git a/.changeset/brave-doors-kneel.md b/.changeset/brave-doors-kneel.md new file mode 100644 index 0000000000..d47fb70c0b --- /dev/null +++ b/.changeset/brave-doors-kneel.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Fix stuck flush tracker when storage flush notification arrives mid-transaction in Consumer diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index c4597e22cf..6bbb4b6b87 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -465,17 +465,14 @@ defmodule Electric.Replication.ShapeLogCollector do OpenTelemetry.add_span_attributes("txn.is_dropped": true) - {:ok, - %{ - state - | flush_tracker: - FlushTracker.handle_txn_fragment( - state.flush_tracker, - txn_fragment, - [], - MapSet.new() - ) - }} + flush_tracker = + if txn_fragment.commit do + FlushTracker.handle_txn_fragment(state.flush_tracker, txn_fragment, []) + else + state.flush_tracker + end + + {:ok, %{state | flush_tracker: flush_tracker}} end defp handle_txn_fragment( @@ -578,22 +575,9 @@ defmodule Electric.Replication.ShapeLogCollector do flush_tracker = case event do - %TransactionFragment{} -> - shapes_with_changes = - for {id, frag} <- events_by_handle, - frag.change_count > 0, - not MapSet.member?(undeliverable_set, id), - do: id, - into: MapSet.new() - - if event.commit, do: LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn) - - FlushTracker.handle_txn_fragment( - flush_tracker, - event, - delivered_shapes, - shapes_with_changes - ) + %TransactionFragment{commit: commit} when not is_nil(commit) -> + LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn) + FlushTracker.handle_txn_fragment(flush_tracker, event, delivered_shapes) _ -> flush_tracker diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex b/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex index 8ea1144a38..f377a3f162 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex @@ -95,41 +95,15 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do last_flushed == %{} and :gb_trees.is_empty(tree) end - @spec handle_txn_fragment( - t(), - TransactionFragment.t(), - Enumerable.t(shape_id()), - MapSet.t(shape_id()) - ) :: t() - - # Non-commit fragment: track affected shapes but don't update last_seen_offset - # or notify. This ensures shapes are registered early so flush notifications - # from Consumers aren't lost when storage flushes before the commit arrives. - def handle_txn_fragment( - %__MODULE__{} = state, - %TransactionFragment{commit: nil, last_log_offset: last_log_offset}, - affected_shapes, - _shapes_with_changes - ) do - track_shapes(state, last_log_offset, affected_shapes) - end + @spec handle_txn_fragment(t(), TransactionFragment.t(), Enumerable.t(shape_id())) :: t() - # Commit fragment: track shapes that have actual changes in this fragment - # or are already being tracked (need last_sent updated to commit offset). - # Skip shapes that only have a commit marker and already flushed from - # earlier non-commit fragments — there's nothing new to flush for them. + # Commit fragment: track all shapes affected by all fragments of the transaction and update last_seen_offset. def handle_txn_fragment( %__MODULE__{} = state, %TransactionFragment{commit: %Commit{}, last_log_offset: last_log_offset}, - affected_shapes, - shapes_with_changes + affected_shapes ) do - shapes_to_track = - Enum.filter(affected_shapes, fn shape -> - shape in shapes_with_changes or is_map_key(state.last_flushed, shape) - end) - - state = track_shapes(state, last_log_offset, shapes_to_track) + state = track_shapes(state, last_log_offset, affected_shapes) state = %{state | last_seen_offset: last_log_offset} @@ -211,14 +185,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do min_incomplete_flush_tree: min_incomplete_flush_tree } - # Only update global offset if we've seen at least one commit. - # Before any commit, last_seen_offset is before_all and there's - # nothing meaningful to report. - if state.last_seen_offset == LogOffset.before_all() do - state - else - update_global_offset(state) - end + update_global_offset(state) end # If the shape is not in the mapping, then we're processing a flush notification for a shape that was removed diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index abe734a678..2917bd57c0 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -270,10 +270,20 @@ defmodule Electric.Shapes.Consumer do end end - def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do - {state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in) + def handle_info({ShapeCache.Storage, :flushed, flushed_offset}, state) do + state = + if is_write_unit_txn(state.write_unit) or is_nil(state.pending_txn) do + # We're not currently in the middle of processing a transaction. This flushed offset is either + # from a previously processed transaction or a non-commit fragment of the most recently + # seen transaction. Notify ShapeLogCollector about it immediately. + confirm_flushed_and_notify(state, flushed_offset) + else + # Storage has signaled latest flushed offset in the middle of processing a multi-fragment + # transaction. Save it for later, to be handled when the commit fragment arrives. + updated_offset = more_recent_offset(state.pending_flush_offset, flushed_offset) + %{state | pending_flush_offset: updated_offset} + end - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn) {:noreply, state, state.hibernate_after} end @@ -567,7 +577,7 @@ defmodule Electric.Shapes.Consumer do # With write_unit=txn all fragments are buffered until the Commit change is seen. At that # point, a transaction struct is produced from the buffered fragments and is written to # storage. - state.write_unit == State.write_unit_txn() -> + is_write_unit_txn(state.write_unit) -> {txns, transaction_builder} = TransactionBuilder.build(txn_fragment, state.transaction_builder) @@ -597,6 +607,7 @@ defmodule Electric.Shapes.Consumer do defp skip_txn_fragment(state, %TransactionFragment{} = txn_fragment) do %{state | pending_txn: nil} |> consider_flushed(txn_fragment.last_log_offset) + |> clear_pending_flush_offset() end # This function does similar things to do_handle_txn/2 but with the following simplifications: @@ -747,9 +758,10 @@ defmodule Electric.Shapes.Consumer do "No relevant changes written in transaction xid=#{txn.xid}" end) - state = %{state | pending_txn: nil} - consider_flushed(state, txn_fragment.last_log_offset) + %{state | pending_txn: nil} + |> consider_flushed(txn_fragment.last_log_offset) end + |> clear_pending_flush_offset() end def process_buffered_txn_fragments(%State{buffer: buffer} = state) do @@ -1006,6 +1018,31 @@ defmodule Electric.Shapes.Consumer do end end + defp confirm_flushed_and_notify(state, flushed_offset) do + {state, txn_offset} = State.align_offset_to_txn_boundary(state, flushed_offset) + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, txn_offset) + state + end + + # After a pending transaction completes and txn_offset_mapping is populated, + # process the deferred flushed offset (if any). + # + # Even if the most recent transaction is skipped or no changes from it end up satisfying the + # shape's `where` condition, Storage may have signaled a flush offset from the previous transaction + # while we were still processing fragments of the current one. Therefore this function must + # be called any time `state.pending_txn` is reset to nil in a multi-fragment transaction + # processing setting. + defp clear_pending_flush_offset(%{pending_flush_offset: nil} = state), do: state + + defp clear_pending_flush_offset(%{pending_flush_offset: flushed_offset} = state) do + %{state | pending_flush_offset: nil} + |> confirm_flushed_and_notify(flushed_offset) + end + + defp more_recent_offset(nil, offset), do: offset + defp more_recent_offset(offset, nil), do: offset + defp more_recent_offset(offset1, offset2), do: LogOffset.max(offset1, offset2) + defp subscribe(state, action) do case ShapeLogCollector.add_shape(state.stack_id, state.shape_handle, state.shape, action) do :ok -> diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index b32204d2a9..52c70eaff9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -42,7 +42,11 @@ defmodule Electric.Shapes.Consumer.State do # Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen. # It is used to check whether the entire txn is visible in the snapshot and to mark it # as flushed in order to handle its remaining fragments appropriately. - pending_txn: nil + pending_txn: nil, + # When a {Storage, :flushed, offset} message arrives during a pending + # transaction, we defer the notification and store the max flushed offset + # here. Multiple deferred notifications are collapsed into a single most recent offset. + pending_flush_offset: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -393,6 +397,6 @@ defmodule Electric.Shapes.Consumer.State do ] end - def write_unit_txn, do: @write_unit_txn - def write_unit_txn_fragment, do: @write_unit_txn_fragment + defguard is_write_unit_txn(write_unit) when write_unit == @write_unit_txn + defguard is_write_unit_txn_fragment(write_unit) when write_unit == @write_unit_txn_fragment end diff --git a/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs index 6fabe24447..5b2f45655e 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector/flush_tracker_test.exs @@ -27,23 +27,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do refute_receive {:flush_confirmed, _} end - test "non-commit fragment tracks shapes but does not notify or update last_seen", %{ - tracker: tracker - } do - fragment = %TransactionFragment{ - xid: 1, - lsn: 1, - last_log_offset: LogOffset.new(1, 0), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - refute_receive {:flush_confirmed, _} - # Shape is tracked in last_flushed - refute FlushTracker.empty?(tracker) - end - - test "non-commit fragment with no affected shapes is a no-op", %{tracker: tracker} do + test "non-commit fragment raises FunctionClauseError", %{tracker: tracker} do fragment = %TransactionFragment{ xid: 1, lsn: 1, @@ -51,192 +35,20 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do commit: nil } - tracker = handle_txn(tracker, fragment, []) - refute_receive {:flush_confirmed, _} - assert FlushTracker.empty?(tracker) - end - - test "shape tracked by non-commit fragment can be flushed before commit arrives", %{ - tracker: tracker - } do - # Non-commit fragment registers shape - fragment = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 4), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Flush notification catches up the shape in last_flushed - tracker = FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 4)) - - # No notification yet — no commit seen - refute_receive {:flush_confirmed, _} - assert FlushTracker.empty?(tracker) - - # Commit arrives with shape1 in affected_shapes (via EventRouter's shapes_in_txn) - # but shape1 has no new changes — only a commit marker. It was already flushed, - # so it should not be re-registered. - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - # Shape was skipped, tracker is empty, global offset notified - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end - - test "shape tracked by non-commit and still pending is updated by commit", %{ - tracker: tracker - } do - # Non-commit fragment registers shape - fragment = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 2), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Commit arrives — shape is still in last_flushed, so last_sent is updated - # (shapes_with_changes doesn't matter here since shape is already tracked) - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - refute FlushTracker.empty?(tracker) - - # Flush at the commit's offset catches up the shape - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end - - test "shape only in commit (not in non-commit fragments) is tracked normally", %{ - tracker: tracker - } do - # Non-commit fragment for shape1 - fragment = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 2), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Commit has both shapes — shape2 has actual changes in the commit fragment - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1", "shape2"], - MapSet.new(["shape2"]) - ) - - refute FlushTracker.empty?(tracker) - - # Both shapes need to be flushed - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - tracker = - FlushTracker.handle_flush_notification(tracker, "shape2", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) + assert_raise FunctionClauseError, fn -> handle_txn(tracker, fragment, ["shape1"]) end end - test "already-flushed shape with new changes in commit is re-tracked", %{ + test "non-commit fragment with no affected shapes raises FunctionClauseError", %{ tracker: tracker } do - # Non-commit fragment registers shape fragment = %TransactionFragment{ xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 4), - commit: nil - } - - tracker = handle_txn(tracker, fragment, ["shape1"]) - - # Flush notification catches up the shape in last_flushed - tracker = FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 4)) - assert FlushTracker.empty?(tracker) - - # Commit arrives — shape1 has NEW changes in the commit fragment - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new(["shape1"]) - ) - - # Shape must be re-tracked to ensure commit-fragment writes are flushed - refute FlushTracker.empty?(tracker) - - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 10)) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) - end - - test "multiple non-commit fragments update last_sent progressively", %{ - tracker: tracker - } do - frag1 = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 2), - commit: nil - } - - frag2 = %TransactionFragment{ - xid: 1, - lsn: 5, - last_log_offset: LogOffset.new(5, 5), + lsn: 1, + last_log_offset: LogOffset.new(1, 0), commit: nil } - tracker = - tracker - |> handle_txn(frag1, ["shape1"]) - |> handle_txn(frag2, ["shape1"]) - - # Flushing to the latest non-commit offset catches up the shape - tracker = - FlushTracker.handle_flush_notification(tracker, "shape1", LogOffset.new(5, 5)) - - # No notification — no commit seen - refute_receive {:flush_confirmed, _} - assert FlushTracker.empty?(tracker) - - # Commit with no new changes — shape was flushed, skipped - tracker = - FlushTracker.handle_txn_fragment( - tracker, - batch(xid: 1, lsn: 5, last_offset: 10), - ["shape1"], - MapSet.new() - ) - - assert_receive {:flush_confirmed, 5} - assert FlushTracker.empty?(tracker) + assert_raise FunctionClauseError, fn -> handle_txn(tracker, fragment, []) end end end @@ -460,12 +272,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do # Helper: calls handle_txn_fragment with shapes_with_changes defaulting to # all affected shapes (the common case for single-fragment transactions). defp handle_txn(tracker, fragment, affected_shapes) do - FlushTracker.handle_txn_fragment( - tracker, - fragment, - affected_shapes, - MapSet.new(affected_shapes) - ) + FlushTracker.handle_txn_fragment(tracker, fragment, affected_shapes) end defp batch(opts) do diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 6d0fe38cbe..a0abc0b37e 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -1677,10 +1677,11 @@ defmodule Electric.Shapes.ConsumerTest do test "flush notification for multi-fragment txn is not lost when storage flushes before commit fragment", %{stack_id: stack_id} = ctx do # Regression test for https://github.com/electric-sql/electric/issues/3985 + # Updated for deferred flush notification fix (#4063). # # When a multi-fragment transaction's non-commit fragments are flushed to disk # before the commit fragment is processed by ShapeLogCollector, the flush - # notification was lost because FlushTracker hadn't registered the shape yet. + # notification was lost because FlushTracker wasn't tracking the shape's offsets. # This caused the shape to be stuck in the FlushTracker, blocking # the global flush offset from advancing. {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, stack_id) @@ -1738,19 +1739,12 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(fragment1, stack_id) assert :ok = ShapeLogCollector.handle_event(fragment2, stack_id) - flushed_log_offset = fragment2.last_log_offset - - # Matching on a traced call inline to avoid any timing issues that - # Trace.collect_traced_calls() is susceptible to in this case. - assert_receive {:trace, _, :call, - {ShapeLogCollector, :notify_flushed, - [^stack_id, ^shape_handle, ^flushed_log_offset]}}, - @receive_timeout + # With deferred flush notifications, notify_flushed is NOT called + # after non-commit fragments. The flush is deferred until the commit. + assert [] == Support.Trace.collect_traced_calls() # Now send the commit fragment. The commit fragment itself has NO matching # changes for the shape — all changes were in earlier fragments. - # After this, FlushTracker registers the shape but the data was already - # flushed, so no new :flushed message will arrive. commit_fragment = txn_fragment( xid, @@ -1768,8 +1762,120 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) assert_receive {^ref, :new_changes, _}, @receive_timeout - # Assert that the flush boundary has advanced which wasn't the case before due to the - # aforementioned bug, + # The deferred flush notification is sent after the commit, aligned + # to the commit fragment's last_log_offset. + commit_offset = commit_fragment.last_log_offset + + assert_receive {:trace, _, :call, + {ShapeLogCollector, :notify_flushed, + [^stack_id, ^shape_handle, ^commit_offset]}}, + @receive_timeout + + # Flush boundary advances. + tx_offset = commit_fragment.last_log_offset.tx_offset + assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout + end + + @tag allow_subqueries: false, with_pure_file_storage_opts: [flush_period: 10_000] + test "flush notification offset is aligned when storage flushes before commit arrives at consumer", + %{stack_id: stack_id} do + # Regression test for https://github.com/electric-sql/electric/issues/4063 + # + # When a non-commit fragment has enough data to trigger a buffer-size + # flush (>= 64KB), the :flushed message is placed in the consumer's + # mailbox during processing. The consumer process ends up handling the :flushed message + # before receiving the commit fragment. But since the offset it sends to FlushTracker + # predates the commit fragment's offset, the FlushTracker keeps the shape in the + # "pending" state and there's no follow-up notification from the consumer that would + # unblock it. + # + # A high flush_period prevents timer-based flushes so the only flush + # comes from the buffer-size trigger, making the test deterministic. + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, stack_id) + + ref = Shapes.Consumer.register_for_changes(stack_id, shape_handle) + register_as_replication_client(stack_id) + + xid = 11 + lsn = Lsn.from_integer(10) + relevant_change_offset = LogOffset.new(lsn, 0) + + # The fragment has a large shape-relevant record (>64KB) that triggers a + # buffer-size flush during write, PLUS a non-matching record at a higher + # offset. This means the source fragment's last_log_offset is higher than + # the shape's last written offset — just like in production where + # transactions touch multiple tables. + padding = String.duplicate("x", 70_000) + + non_commit_fragment = + txn_fragment( + xid, + lsn, + [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => padding}, + log_offset: relevant_change_offset + }, + # This change does NOT match shape1 (test_table) but raises the + # fragment's last_log_offset above the shape's written offset. + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 50) + } + ], + has_begin?: true + ) + + # Commit fragment has only a change for a different table. The consumer + # writes nothing for it but still finalises the pending transaction, + # populating txn_offset_mapping. + commit_fragment = + txn_fragment( + xid, + lsn, + [ + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "99"}, + log_offset: LogOffset.new(lsn, 100) + } + ], + has_commit?: true + ) + + # Send non-commit fragment. The large record triggers a buffer flush, + # placing {Storage, :flushed, offset} in the consumer's mailbox. + Support.Trace.trace_shape_log_collector_calls( + pid: Shapes.Consumer.whereis(stack_id, shape_handle), + functions: [:notify_flushed] + ) + + assert :ok = ShapeLogCollector.handle_event(non_commit_fragment, stack_id) + + # With deferred flush notifications, the consumer does NOT call notify_flushed + # after the non-commit fragment. The :flushed message is saved for later. + assert [] == Support.Trace.collect_traced_calls() + + # Send the commit fragment to finalize the transaction. + assert :ok = ShapeLogCollector.handle_event(commit_fragment, stack_id) + + # Consumer has processed the relevant change... + assert_receive {^ref, :new_changes, ^relevant_change_offset}, @receive_timeout + + # The deferred flush notification is sent after the commit with the + # aligned offset (the commit fragment's last_log_offset). + commit_last_log_offset = commit_fragment.last_log_offset + + assert [ + {ShapeLogCollector, :notify_flushed, + [^stack_id, ^shape_handle, ^commit_last_log_offset]} + ] = Support.Trace.collect_traced_calls() + + # Flush boundary advances correctly. tx_offset = commit_fragment.last_log_offset.tx_offset assert_receive {:flush_boundary_updated, ^tx_offset}, @receive_timeout end diff --git a/packages/sync-service/test/electric/shapes/event_router_test.exs b/packages/sync-service/test/electric/shapes/event_router_test.exs index eae7c2b59f..e670348b47 100644 --- a/packages/sync-service/test/electric/shapes/event_router_test.exs +++ b/packages/sync-service/test/electric/shapes/event_router_test.exs @@ -8,6 +8,7 @@ defmodule Electric.Shapes.EventRouterTest do alias Electric.Replication.Changes.TruncatedRelation alias Electric.Replication.Changes.UpdatedRecord alias Electric.Replication.Changes.TransactionFragment + alias Electric.Replication.LogOffset alias Electric.Shapes.EventRouter alias Electric.Shapes.Shape alias Support.StubInspector @@ -96,6 +97,8 @@ defmodule Electric.Shapes.EventRouterTest do {result, _router} = EventRouter.event_by_shape_handle(router, batch) + assert ["s1", "s2"] == result |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -275,6 +278,8 @@ defmodule Electric.Shapes.EventRouterTest do {result, _router} = EventRouter.event_by_shape_handle(router, batch) + assert ["s1", "s2", "s3", "s4"] == result |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -318,6 +323,8 @@ defmodule Electric.Shapes.EventRouterTest do {result, _router} = EventRouter.event_by_shape_handle(router, batch) + assert ["s1", "s2"] == result |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{affected_relations: s1_relations}, "s2" => %TransactionFragment{affected_relations: s2_relations} @@ -374,9 +381,8 @@ defmodule Electric.Shapes.EventRouterTest do {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) - assert %{ - "s1" => %TransactionFragment{has_begin?: true, changes: [^insert]} - } = result2 + assert ["s1"] == Map.keys(result2) + assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert]}} = result2 end test "Begin seen once per shape even across multiple batches" do @@ -423,13 +429,14 @@ defmodule Electric.Shapes.EventRouterTest do batch1 = %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1]} {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1]}} = result1 insert2 = %NewRecord{relation: {"public", "t1"}, record: %{"id" => "2"}} batch2 = %TransactionFragment{xid: 100, has_begin?: false, changes: [insert2]} {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) - + assert ["s2"] == Map.keys(result2) assert %{"s2" => %TransactionFragment{has_begin?: true, changes: [^insert2]}} = result2 end @@ -445,6 +452,7 @@ defmodule Electric.Shapes.EventRouterTest do {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1]}} = result1 insert2 = %NewRecord{relation: {"public", "t1"}, record: %{"id" => "2"}} @@ -459,6 +467,8 @@ defmodule Electric.Shapes.EventRouterTest do {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) + assert ["s1", "s2"] == result2 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{has_begin?: false, commit: ^commit_op, changes: []}, "s2" => %TransactionFragment{ @@ -496,6 +506,100 @@ defmodule Electric.Shapes.EventRouterTest do assert result2 == %{"s1" => batch2} end + test "commit-only fragment for shape when commit has changes only for a different table" do + router = + EventRouter.new() + |> EventRouter.add_shape("s1", Shape.new!("t1", inspector: @inspector)) + |> EventRouter.add_shape("s2", Shape.new!("t2", inspector: @inspector)) + + insert_t1 = %NewRecord{ + relation: {"public", "t1"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(10, 0) + } + + insert_t2_first = %NewRecord{ + relation: {"public", "t2"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(10, 2) + } + + batch1 = %TransactionFragment{ + xid: 100, + has_begin?: true, + last_log_offset: LogOffset.new(10, 2), + changes: [insert_t1, insert_t2_first], + change_count: 2 + } + + {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + + # Shape s1 only gets the t1 change but last_log_offset is copied from the txn fragment + batch1_last_offset = LogOffset.new(10, 2) + + assert ["s1", "s2"] == Map.keys(result1) + + assert %{ + "s1" => %TransactionFragment{ + has_begin?: true, + commit: nil, + changes: [^insert_t1], + change_count: 1, + last_log_offset: ^batch1_last_offset + }, + "s2" => %TransactionFragment{ + has_begin?: true, + commit: nil, + changes: [^insert_t2_first], + change_count: 1, + last_log_offset: ^batch1_last_offset + } + } = result1 + + # Commit fragment has only a t2 change + insert_t2_last = %NewRecord{ + relation: {"public", "t2"}, + record: %{"id" => "99"}, + log_offset: LogOffset.new(10, 4) + } + + commit_op = %Commit{commit_timestamp: ~U[2024-01-01 00:00:00Z]} + + batch2 = %TransactionFragment{ + xid: 100, + has_begin?: false, + commit: commit_op, + last_log_offset: LogOffset.new(10, 4), + changes: [insert_t2_last], + change_count: 1 + } + + {result2, _router} = EventRouter.event_by_shape_handle(router, batch2) + + # s1 gets a commit-only fragment: change_count=0, + # and last_log_offset from the commit fragment + batch2_last_offset = LogOffset.new(10, 4) + + assert ["s1", "s2"] == Map.keys(result2) + + assert %{ + "s1" => %TransactionFragment{ + has_begin?: false, + commit: ^commit_op, + changes: [], + change_count: 0, + last_log_offset: ^batch2_last_offset + }, + "s2" => %TransactionFragment{ + has_begin?: false, + commit: ^commit_op, + changes: [^insert_t2_last], + change_count: 1, + last_log_offset: ^batch2_last_offset + } + } = result2 + end + test "transaction state is reset after Commit" do router = EventRouter.new() @@ -594,6 +698,7 @@ defmodule Electric.Shapes.EventRouterTest do batch1 = %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1]} {result1, router} = EventRouter.event_by_shape_handle(router, batch1) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1]}} = result1 insert2 = %NewRecord{relation: {"public", "t1"}, record: %{"id" => "2"}} @@ -602,6 +707,8 @@ defmodule Electric.Shapes.EventRouterTest do {result2, router} = EventRouter.event_by_shape_handle(router, batch2) + assert ["s2", "s3"] == result2 |> Map.keys() |> Enum.sort() + assert %{ "s2" => %TransactionFragment{has_begin?: true, changes: [^insert2]}, "s3" => %TransactionFragment{has_begin?: true, changes: [^insert3]} @@ -619,6 +726,8 @@ defmodule Electric.Shapes.EventRouterTest do {result3, _router} = EventRouter.event_by_shape_handle(router, batch3) + assert ["s1", "s2", "s3"] == result3 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -646,6 +755,7 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}} = result1 # Shape s2 added mid-transaction - should not receive any events from this transaction @@ -674,6 +784,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s1 gets its changes + commit, s2 gets nothing (added mid-transaction) + assert ["s1"] == Map.keys(result2) + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -698,6 +810,8 @@ defmodule Electric.Shapes.EventRouterTest do } ) + assert ["s1", "s2"] == result3 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -726,6 +840,7 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}} = result1 # Shapes s2 and s3 added mid-transaction - neither should receive any events from this @@ -753,6 +868,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s1 gets its changes + commit, s2 and s3 get nothing (added mid-transaction) + assert ["s1"] == Map.keys(result2) + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -778,6 +895,8 @@ defmodule Electric.Shapes.EventRouterTest do } ) + assert ["s1", "s2", "s3"] == result3 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{ has_begin?: true, @@ -812,6 +931,8 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1", "s2"] == result1 |> Map.keys() |> Enum.sort() + assert %{ "s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}, "s2" => %TransactionFragment{has_begin?: true, changes: [^insert2a]} @@ -836,6 +957,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s1 gets nothing (removed), s2 gets its changes + commit + assert ["s2"] == Map.keys(result2) + assert %{ "s2" => %TransactionFragment{ has_begin?: false, @@ -859,6 +982,7 @@ defmodule Electric.Shapes.EventRouterTest do %TransactionFragment{xid: 100, has_begin?: true, changes: [insert1a, insert2a]} ) + assert ["s1"] == Map.keys(result1) assert %{"s1" => %TransactionFragment{has_begin?: true, changes: [^insert1a]}} = result1 # s2 added mid-transaction @@ -879,6 +1003,7 @@ defmodule Electric.Shapes.EventRouterTest do ) # s2 skipped because added mid-transaction + assert ["s1"] == Map.keys(result2) assert %{"s1" => %TransactionFragment{has_begin?: false, changes: [^insert1b]}} = result2 # s2 removed before transaction ends @@ -900,6 +1025,8 @@ defmodule Electric.Shapes.EventRouterTest do ) # s2 removed, so only s1 gets events + assert ["s1"] == Map.keys(result3) + assert %{ "s1" => %TransactionFragment{ has_begin?: false, @@ -924,6 +1051,8 @@ defmodule Electric.Shapes.EventRouterTest do } ) + assert ["s1"] == Map.keys(result4) + assert %{ "s1" => %TransactionFragment{ has_begin?: true,