From c19c68b375ea26e0e2f2904f0ff3d78f79c7fe1e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 13:24:25 +0100 Subject: [PATCH 01/19] Clarify the explanatory comment on Xid.after_snapshot?() --- packages/sync-service/lib/electric/postgres/xid.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/postgres/xid.ex b/packages/sync-service/lib/electric/postgres/xid.ex index d4b0558e1f..d764ba2e0c 100644 --- a/packages/sync-service/lib/electric/postgres/xid.ex +++ b/packages/sync-service/lib/electric/postgres/xid.ex @@ -269,7 +269,8 @@ defmodule Electric.Postgres.Xid do @type pg_snapshot() :: {anyxid, anyxid, [anyxid]} @doc """ - Check if a transaction is after the end of a snapshot - if it's xid is over xmax + Check if a transaction is in the future from the POV of the snapshot. In other words, if its + xid is >= xmax, its changes are definitely *not* visible and *won't become* visible in this snapshot. """ @spec after_snapshot?(anyxid, pg_snapshot()) :: boolean() def after_snapshot?(xid, {_, xmax, _}) when not is_lt(xid, xmax), do: true From 92f4c31cc404dc3d17de5465667c748ec2c0d534 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 13:33:08 +0100 Subject: [PATCH 02/19] WIP Consumer handle_txn_fragment --- .../lib/electric/shapes/consumer.ex | 107 +++++++++++++----- .../shapes/consumer/initial_snapshot.ex | 19 +++- .../electric/shapes/consumer/pending_txn.ex | 58 ++++++++++ .../lib/electric/shapes/consumer/state.ex | 13 ++- 4 files changed, 159 insertions(+), 38 deletions(-) create mode 100644 packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 8bc0109adf..fa95a8eacc 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -2,9 +2,10 @@ defmodule Electric.Shapes.Consumer do use GenServer, restart: :temporary alias Electric.Shapes.Consumer.ChangeHandling - alias Electric.Shapes.Consumer.MoveIns alias Electric.Shapes.Consumer.InitialSnapshot alias Electric.Shapes.Consumer.MoveHandling + alias Electric.Shapes.Consumer.MoveIns + alias Electric.Shapes.Consumer.PendingTxn alias Electric.Shapes.Consumer.State import Electric.Shapes.Consumer.State, only: :macros @@ -452,33 +453,81 @@ defmodule Electric.Shapes.Consumer do end defp handle_event(%TransactionFragment{} = txn_fragment, state) do - {txns, transaction_builder} = - TransactionBuilder.build(txn_fragment, state.transaction_builder) + handle_txn_fragment(txn_fragment, state) + end + + # The first fragment of a new transaction: initialize pending txn metadata and check if the + # transaction xid is already part of the snapshot. If it is, all subsequent fragments of this + # transaction will be ignored. + defp handle_txn_fragment( + %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, + %State{pending_txn: nil} = state + ) do + txn = PendingTxn.new(xid) + state = %{state | pending_txn: txn} + handle_txn_fragment(txn_fragment, state) + end + + # We can use the initial filtering to our advantage here and avoid accumulating fragments for + # a transaction that is going to be skipped anyway. This works both in the regular mode and + # in the fragment-direct mode. + defp handle_txn_fragment( + %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, + %State{} = state + ) + when needs_initial_filtering(state) do + case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, xid) do + {:consider_flushed, initial_snapshot_state} -> + # This transaction is already included in the snapshot, so just ignore all of its fragments. + # TODO: it could be the case the multiple txns are seen while InitialSnapshot needs + # filtering. And the outcomes for those txns could be different. + %{ + state + | pending_txn: PendingTxn.consider_flushed(state.pending_txn), + initial_snapshot_state: initial_snapshot_state + } - state = %{state | transaction_builder: transaction_builder} - handle_txns(txns, state) + {:continue, new_initial_snapshot_state} -> + # The transaction is not part of the initial snapshot. + state = %{state | initial_snapshot_state: new_initial_snapshot_state} + process_txn_fragment(txn_fragment, state) + end end - defp handle_txns(txns, %State{} = state), do: Enum.reduce_while(txns, state, &handle_txn/2) + defp handle_txn_fragment( + %TransactionFragment{last_log_offset: last_log_offset} = txn_fragment, + %State{pending_txn: txn} = state + ) do + if txn.consider_flushed? or fragment_already_processed?(txn_fragment, state) do + # TODO: Do not mark the fragment itself flushed. When the cnnection restarts, we want to + # receive all fragments starting from the one with has_begin? so as not to overcomplicate + # the consumer logic. + consider_flushed(state, last_log_offset) + else + process_txn_fragment(txn_fragment, state) + end + end - # Keep buffering for initial snapshot - defp handle_txn(txn, %State{buffering?: true} = state), - do: {:cont, State.add_to_buffer(state, txn)} + # In the regular mode all fragments are buffered until the Commit change is seen. At that + # point, a transaction struct is produced from the buffered fragments and is written to + # storage. + defp process_txn_fragment(txn_fragment, %State{fragment_direct?: false} = state) do + {txns, transaction_builder} = + TransactionBuilder.build(txn_fragment, state.transaction_builder) - defp handle_txn(txn, state) when needs_initial_filtering(state) do - case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, txn) do - {:consider_flushed, initial_snapshot_state} -> - {:cont, consider_flushed(%{state | initial_snapshot_state: initial_snapshot_state}, txn)} + state = %{state | transaction_builder: transaction_builder} - {:continue, new_initial_snapshot_state} -> - handle_txn_in_span(txn, %{state | initial_snapshot_state: new_initial_snapshot_state}) + case txns do + [] -> state + [txn] -> handle_txn(txn, state) end end - # Remove the move-in buffering check - just process immediately - defp handle_txn(txn, state), do: handle_txn_in_span(txn, state) + # In the fragment-direct mode, each transaction fragment is written to storage individually. + defp process_txn_fragment(txn_fragment, state) do + end - defp handle_txn_in_span(txn, %State{} = state) do + defp handle_txn(txn, %State{} = state) do ot_attrs = [xid: txn.xid, total_num_changes: txn.num_changes] ++ shape_attrs(state.shape_handle, state.shape) @@ -487,19 +536,10 @@ defmodule Electric.Shapes.Consumer do "shape_write.consumer.handle_txn", ot_attrs, state.stack_id, - fn -> - do_handle_txn(txn, state) - end + fn -> do_handle_txn(txn, state) end ) end - defp do_handle_txn(%Transaction{} = txn, state) - when LogOffset.is_log_offset_lte(txn.last_log_offset, state.latest_offset) do - Logger.debug(fn -> "Skipping already processed txn #{txn.xid}" end) - - {:cont, consider_flushed(state, txn)} - end - defp do_handle_txn(%Transaction{xid: xid, changes: changes} = txn, state) do %{ shape: shape, @@ -546,7 +586,7 @@ defmodule Electric.Shapes.Consumer do "No relevant changes found for #{inspect(shape)} in txn #{txn.xid}" end) - {:cont, consider_flushed(state, txn)} + {:cont, consider_flushed(state, txn.last_log_offset)} {changes, state, num_changes, last_log_offset} -> timestamp = System.monotonic_time() @@ -685,14 +725,19 @@ defmodule Electric.Shapes.Consumer do Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) end - defp consider_flushed(%State{} = state, %Transaction{last_log_offset: new_boundary}) do + defp fragment_already_processed?(%TransactionFragment{last_log_offset: offset}, state) do + LogOffset.is_log_offset_lte(offset, state.latest_offset) + end + + defp consider_flushed(%State{} = state, log_offset) do if state.txn_offset_mapping == [] do # No relevant txns have been observed and unflushed, we can notify immediately - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, new_boundary) + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, log_offset) state else # We're looking to "relabel" the next flush to include this txn, so we're looking for the # boundary that has a highest boundary less than this offset + new_boundary = log_offset {head, tail} = Enum.split_while( diff --git a/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex b/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex index c654d63d2e..ff4f1a4d1d 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex @@ -43,11 +43,16 @@ defmodule Electric.Shapes.Consumer.InitialSnapshot do def needs_buffering?(%__MODULE__{pg_snapshot: snapshot}), do: is_nil(snapshot) + def maybe_stop_initial_filtering(%__MODULE__{} = state, storage, %Transaction{xid: xid}) do + maybe_stop_initial_filtering(state, storage, xid) + end + def maybe_stop_initial_filtering( %__MODULE__{pg_snapshot: {xmin, xmax, xip_list} = snapshot} = state, storage, - %Transaction{xid: xid} - ) do + xid + ) + when is_integer(xid) and xid > 0 do if Xid.after_snapshot?(xid, snapshot) do Storage.set_pg_snapshot( %{xmin: xmin, xmax: xmax, xip_list: xip_list, filter_txns?: false}, @@ -89,11 +94,15 @@ defmodule Electric.Shapes.Consumer.InitialSnapshot do %{state | snapshot_started?: true} end - def filter(state, storage, %Transaction{} = txn) do - if Transaction.visible_in_snapshot?(txn, state.pg_snapshot) do + def filter(state, storage, %Transaction{xid: xid}) do + filter(state, storage, xid) + end + + def filter(state, storage, xid) when is_integer(xid) and xid > 0 do + if Transaction.visible_in_snapshot?(xid, state.pg_snapshot) do {:consider_flushed, state} else - state = maybe_stop_initial_filtering(state, storage, txn) + state = maybe_stop_initial_filtering(state, storage, xid) {:continue, state} end end diff --git a/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex b/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex new file mode 100644 index 0000000000..fefa0b1d5c --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex @@ -0,0 +1,58 @@ +defmodule Electric.Shapes.Consumer.PendingTxn do + @moduledoc """ + Tracks metadata for an in-progress transaction during fragment-direct streaming. + + When a consumer streams transaction fragments directly to storage (for shapes + without subquery dependencies), this struct tracks the transaction metadata + until commit is received. + + This is an antipod module to Electric.Replication.TransactionBuilder. This module only tracks + metadata related to the current transaction for which txn fragments are processed as the + fragments themselves are written to strorage and are discarded from memory immediately, while + the TransactionBuilder module accumulates all changes in memory and returns a complete + transaction after seeing a Commit. + """ + + alias Electric.Replication.LogOffset + + defstruct [ + :xid, + :last_log_offset, + consider_flushed?: false, + num_changes: 0, + total_bytes: 0 + ] + + @type t :: %__MODULE__{ + xid: pos_integer(), + last_log_offset: LogOffset.t() | nil, + consider_flushed?: boolean(), + num_changes: non_neg_integer(), + total_bytes: non_neg_integer() + } + + @doc """ + Create a new pending transaction tracker. + """ + @spec new(pos_integer()) :: t() + def new(xid) do + %__MODULE__{xid: xid} + end + + @doc """ + Update the pending transaction with changes that were written to storage. + """ + @spec update_with_changes(t(), LogOffset.t(), non_neg_integer(), non_neg_integer()) :: t() + def update_with_changes(%__MODULE__{} = pending_txn, log_offset, count, bytes) do + %{ + pending_txn + | last_log_offset: log_offset, + num_changes: pending_txn.num_changes + count, + total_bytes: pending_txn.total_bytes + bytes + } + end + + def consider_flushed(%__MODULE__{} = pending_txn) do + %{pending_txn | consider_flushed?: true} + end +end diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index ee3f517fb5..d7fb8046da 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -30,7 +30,14 @@ defmodule Electric.Shapes.Consumer.State do terminating?: false, buffering?: false, or_with_subquery?: false, - not_with_subquery?: false + not_with_subquery?: false, + # Fragment-direct streaming fields + # When true, stream fragments directly to storage without buffering + fragment_direct?: false, + # Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen. + # It is used to check whether the entire txn is visible in the snapshot and to mark it + # as flushed in order to handle its remaining fragments appropriately. + pending_txn: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -143,7 +150,9 @@ defmodule Electric.Shapes.Consumer.State do ), buffering?: true, or_with_subquery?: has_or_with_subquery?(shape), - not_with_subquery?: has_not_with_subquery?(shape) + not_with_subquery?: has_not_with_subquery?(shape), + # Enable fragment-direct streaming for shapes without subquery dependencies + fragment_direct?: shape.shape_dependencies == [] } end From 678a1543f27b1fe07530662ba05ae407691c989e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 13:36:45 +0100 Subject: [PATCH 03/19] Buffer transaction fragments directly, not waiting for a complete transaction --- .../lib/electric/shapes/consumer.ex | 35 ++++++++++++++----- .../lib/electric/shapes/consumer/state.ex | 8 +++-- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index fa95a8eacc..b47c1b62a1 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -167,16 +167,13 @@ defmodule Electric.Shapes.Consumer do stop_and_clean(state) end - def handle_continue(:consume_buffer, %State{buffer: buffer} = state) do - Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transactions" end) - state = %{state | buffer: [], buffering?: false} + def handle_continue(:consume_buffer, state) do + state = process_buffered_txn_fragments(state) - case handle_txns(Enum.reverse(buffer), state) do - %State{terminating?: true} = state -> - {:noreply, state, {:continue, :stop_and_clean}} - - state -> - {:noreply, state, state.hibernate_after} + if state.terminating? do + {:noreply, state, {:continue, :stop_and_clean}} + else + {:noreply, state, state.hibernate_after} end end @@ -456,6 +453,19 @@ defmodule Electric.Shapes.Consumer do handle_txn_fragment(txn_fragment, state) end + # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). + # In this phase we have to buffer incoming txn fragments because we can't yet decide what to + # do with the transaction: skip it or write it to the shape log. + # + # When snapshot info arrives, `process_buffered_txn_fragments/1` will be called to process + # buffered fragments in order. + defp handle_txn_fragment( + %TransactionFragment{} = txn_fragment, + %State{buffering?: true} = state + ) do + State.add_to_buffer(state, txn_fragment) + end + # The first fragment of a new transaction: initialize pending txn metadata and check if the # transaction xid is already part of the snapshot. If it is, all subsequent fragments of this # transaction will be ignored. @@ -527,6 +537,13 @@ defmodule Electric.Shapes.Consumer do defp process_txn_fragment(txn_fragment, state) do end + def process_buffered_txn_fragments(%State{buffer: buffer} = state) do + Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transaction fragments" end) + {txn_fragments, state} = State.pop_buffered(state) + # TODO: verify the "while" condition here, when will it terminate early? + Enum.reduce_while(txn_fragments, state, &handle_txn_fragment/2) + end + defp handle_txn(txn, %State{} = state) do ot_attrs = [xid: txn.xid, total_num_changes: txn.num_changes] ++ diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index d7fb8046da..46badfb46b 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -3,7 +3,6 @@ defmodule Electric.Shapes.Consumer.State do alias Electric.Shapes.Consumer.MoveIns alias Electric.Shapes.Consumer.InitialSnapshot alias Electric.Shapes.Shape - alias Electric.Replication.Changes.Transaction alias Electric.Replication.Eval.Parser alias Electric.Replication.Eval.Walker alias Electric.Replication.TransactionBuilder @@ -250,11 +249,16 @@ defmodule Electric.Shapes.Consumer.State do end end - @spec add_to_buffer(t(), Transaction.t()) :: t() + @spec add_to_buffer(t(), TransactionFragment.t()) :: t() def add_to_buffer(%__MODULE__{buffer: buffer} = state, txn) do %{state | buffer: [txn | buffer]} end + @spec pop_buffered(t()) :: {[TransactionFragment.t()], t()} + def pop_buffered(%__MODULE__{buffer: buffer} = state) do + {Enum.reverse(buffer), %{state | buffer: [], buffering?: false}} + end + @spec add_waiter(t(), GenServer.from()) :: t() def add_waiter(%__MODULE__{initial_snapshot_state: initial_snapshot_state} = state, from) do %{ From 6828bbea07bacff4b8268c26727f3373c8328b90 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 13:42:00 +0100 Subject: [PATCH 04/19] wip Truly write invididual txn fragments to storage --- .../lib/electric/shapes/consumer.ex | 185 ++++++++++++++++-- 1 file changed, 170 insertions(+), 15 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index b47c1b62a1..94cc2b1142 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -535,6 +535,163 @@ defmodule Electric.Shapes.Consumer do # In the fragment-direct mode, each transaction fragment is written to storage individually. defp process_txn_fragment(txn_fragment, state) do + state + |> write_txn_fragment_to_storage(txn_fragment) + |> maybe_complete_pending_txn(txn_fragment) + end + + # This function does similar things to do_handle_txn/2 but with the following simplifications: + # - it doesn't account for move-ins or move-outs or converting update operations into insert/delete + # - the fragment is written directly to storage if it has changes matching this shape + # - if the fragment has a commit message, the ShapeLogCollector is informed about the new flush boundary + defp write_txn_fragment_to_storage(state, %TransactionFragment{changes: []}), do: state + + defp write_txn_fragment_to_storage( + state, + %TransactionFragment{changes: changes, commit: commit, xid: xid} = fragment + ) do + %{ + shape: shape, + writer: writer, + pending_txn: txn, + stack_id: stack_id, + shape_handle: shape_handle + } = state + + case convert_fragment_changes(changes, stack_id, shape_handle, shape) do + :includes_truncate -> + handle_txn_with_truncate(xid, state) + + {[], 0} -> + Logger.debug(fn -> + "No relevant changes found for #{inspect(shape)} in txn fragment of txn #{xid}" + end) + + # TODO: this needs to signal something to the subsequent maybe_complete_pending_txn call + consider_flushed(state, fragment.last_log_offset) + + {reversed_changes, num_changes, last_log_offset} -> + converted_changes = + reversed_changes + |> maybe_mark_last_change(commit) + |> Enum.reverse() + + {lines, total_size} = prepare_log_entries(converted_changes, xid, shape) + writer = ShapeCache.Storage.append_fragment_to_log!(lines, writer) + + txn = PendingTxn.update_with_changes(txn, last_log_offset, num_changes, total_size) + + Logger.debug(fn -> + "Wrote #{num_changes} changes for fragment xid=#{xid}, total_bytes=#{total_size}" + end) + + %{ + state + | writer: writer, + latest_offset: last_log_offset, + pending_txn: txn, + txn_offset_mapping: + state.txn_offset_mapping ++ [{last_log_offset, fragment.last_log_offset}] + } + end + end + + defp convert_fragment_changes(changes, stack_id, shape_handle, shape) do + Enum.reduce_while(changes, {[], 0}, fn + %Changes.TruncatedRelation{}, _acc -> + {:halt, :includes_truncate} + + change, {changes, count} = acc -> + # Apply Shape.convert_change to each change to: + # 1. Filter out changes not matching the shape's table + # 2. Apply WHERE clause filtering + case Shape.convert_change(shape, change, stack_id: stack_id, shape_handle: shape_handle) do + [] -> + {:cont, acc} + + [change] -> + {:cont, {[change | changes], count + 1}} + end + end) + |> case do + {[change | _] = changes, num_changes} -> + {changes, num_changes, LogItems.expected_offset_after_split(change)} + + acc -> + acc + end + end + + # Mark the last change in the list as last? when this is a commit fragment + # This is needed for clients to know when a transaction is complete + # The changes passed to this function are in reversed order, i.e. the last change is the head of the list. + defp maybe_mark_last_change([], _commit), do: [] + defp maybe_mark_last_change(changes, nil), do: changes + + defp maybe_mark_last_change(changes, _commit) do + [head | tail] = changes + [%{head | last?: true} | tail] + end + + defp maybe_complete_pending_txn(state, %TransactionFragment{commit: nil}), do: state + + defp maybe_complete_pending_txn(%{terminating?: true} = state, _fragment) do + # If we're terminating (e.g., due to truncate), don't complete the transaction + state + end + + defp maybe_complete_pending_txn(state, %TransactionFragment{commit: commit} = fragment) do + %{pending_txn: txn, writer: writer, shape: shape, shape_handle: shape_handle} = state + + # Signal commit to storage for potential chunk boundary calculation + writer = ShapeCache.Storage.signal_txn_commit!(txn.xid, writer) + + # Only notify if we actually wrote changes + if txn.num_changes > 0 do + # TODO: Only notify upon writing the full txn to storage + notify_new_changes(state, fragment.changes, txn.last_log_offset) + + lag = calculate_replication_lag_from_commit(commit) + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :storage, :transaction_stored], + %{ + bytes: txn.total_bytes, + count: 1, + operations: txn.num_changes, + replication_lag: lag + }, + Map.new(shape_attrs(shape_handle, shape)) + ) + + Logger.debug(fn -> + "Completed fragment-direct transaction xid=#{txn.xid}, changes=#{txn.num_changes}" + end) + + %{ + state + | writer: writer, + latest_offset: txn.last_log_offset, + pending_txn: nil, + txn_offset_mapping: + state.txn_offset_mapping ++ [{txn.last_log_offset, fragment.last_log_offset}] + } + else + # No changes were written - notify flush boundary like consider_flushed does + Logger.debug(fn -> + "No relevant changes in fragment-direct transaction xid=#{txn.xid}" + end) + + state = %{state | writer: writer, pending_txn: nil} + consider_flushed(state, fragment.last_log_offset) + end + end + + defp calculate_replication_lag_from_commit(%Changes.Commit{commit_timestamp: nil}), do: 0 + + defp calculate_replication_lag_from_commit(%Changes.Commit{commit_timestamp: commit_timestamp}) do + now = DateTime.utc_now() + Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) end def process_buffered_txn_fragments(%State{buffer: buffer} = state) do @@ -558,11 +715,7 @@ defmodule Electric.Shapes.Consumer do end defp do_handle_txn(%Transaction{xid: xid, changes: changes} = txn, state) do - %{ - shape: shape, - shape_handle: shape_handle, - writer: writer - } = state + %{shape: shape, writer: writer} = state state = State.remove_completed_move_ins(state, txn) @@ -587,16 +740,7 @@ defmodule Electric.Shapes.Consumer do %{xid: xid, extra_refs: {extra_refs_before_move_ins, extra_refs_full}} ) do :includes_truncate -> - # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are - # present in the transaction, we're considering the whole transaction empty, and - # just rotate the shape handle. "Correct" way to handle truncates is to be designed. - Logger.warning( - "Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}" - ) - - state = mark_for_removal(state) - - {:halt, state} + {:halt, handle_txn_with_truncate(txn.xid, state)} {_, state, 0, _} -> Logger.debug(fn -> @@ -644,6 +788,17 @@ defmodule Electric.Shapes.Consumer do end end + defp handle_txn_with_truncate(xid, state) do + # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are + # present in the transaction, we're considering the whole transaction empty, and + # just rotate the shape handle. "Correct" way to handle truncates is to be designed. + Logger.warning( + "Truncate operation encountered while processing txn #{xid} for #{state.shape_handle}" + ) + + mark_for_removal(state) + end + defp notify_new_changes(state, nil), do: state defp notify_new_changes(state, {changes, upper_bound}) do From d9708df2e2266e8fd2692f5c26b1466e4b3aa5d1 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 13:42:59 +0100 Subject: [PATCH 05/19] Augment Storage API with the ability to write txn fragments --- .../lib/electric/shape_cache/storage.ex | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index 4ca33422dc..de5e93e2b9 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -151,6 +151,29 @@ defmodule Electric.ShapeCache.Storage do @callback append_to_log!(Enumerable.t(log_item()), writer_state()) :: writer_state() | no_return() + @doc """ + Append log items from a transaction fragment. + + Called potentially multiple times per transaction for shapes that stream + fragments directly to storage without waiting for the complete transaction. + Unlike `append_to_log!/2`, this does not assume transaction completion. + + Transaction commits should be signaled separately via `signal_txn_commit!/2` + to allow storage to calculate chunk boundaries at transaction boundaries. + """ + @callback append_fragment_to_log!(Enumerable.t(log_item()), writer_state()) :: + writer_state() | no_return() + + @doc """ + Signal that a transaction has committed. + + Used by storage to calculate chunk boundaries at transaction boundaries. + Called after all fragments for a transaction have been written via + `append_fragment_to_log!/2`. + """ + @callback signal_txn_commit!(xid :: pos_integer(), writer_state()) :: + writer_state() | no_return() + @doc "Get stream of the log for a shape since a given offset" @callback get_log_stream(offset :: LogOffset.t(), max_offset :: LogOffset.t(), shape_opts()) :: log() @@ -211,6 +234,29 @@ defmodule Electric.ShapeCache.Storage do Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) end + # This function seamlessly unwraps TestStorage to give the storage implementation direct access to its options. + if Mix.env() != :test do + def opts_for_stack(stack_id) do + {_module, opts} = Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) + opts + end + else + def opts_for_stack(stack_id) do + case Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) do + {Support.TestStorage, {_parent_pid, _test_storage_init, {_storage_mod, storage_opts}}} -> + storage_opts + + {_storage_mod, storage_opts} -> + storage_opts + end + end + end + + def opt_for_stack(stack_id, opt_name) do + opts = opts_for_stack(stack_id) + Map.fetch!(opts, opt_name) + end + @spec child_spec(shape_storage()) :: Supervisor.child_spec() def child_spec({module, shape_opts}) do %{ @@ -355,6 +401,16 @@ defmodule Electric.ShapeCache.Storage do {mod, mod.append_to_log!(log_items, shape_opts)} end + @impl __MODULE__ + def append_fragment_to_log!(log_items, {mod, shape_opts}) do + {mod, mod.append_fragment_to_log!(log_items, shape_opts)} + end + + @impl __MODULE__ + def signal_txn_commit!(xid, {mod, shape_opts}) do + {mod, mod.signal_txn_commit!(xid, shape_opts)} + end + @impl __MODULE__ def get_log_stream(offset, max_offset \\ @last_log_offset, storage) From 958c4a6e4a0de5ae4cdac627157973cec2c43b70 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 13:44:16 +0100 Subject: [PATCH 06/19] Implement txn fragment handling in PureFileStorage --- .../electric/shape_cache/pure_file_storage.ex | 56 +++++++++++++---- .../pure_file_storage/write_loop.ex | 60 +++++++++++++++++++ 2 files changed, 106 insertions(+), 10 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex index 90ca32e7a7..82e03bc514 100644 --- a/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex @@ -149,8 +149,8 @@ defmodule Electric.ShapeCache.PureFileStorage do end defp shape_data_dir(%__MODULE__{stack_id: stack_id, shape_handle: shape_handle}, suffix) do - {__MODULE__, stack_opts} = Storage.for_stack(stack_id) - shape_data_dir(stack_opts.base_path, shape_handle, suffix) + base_path = Storage.opt_for_stack(stack_id, :base_path) + shape_data_dir(base_path, shape_handle, suffix) end defp shape_data_dir(base_path, shape_handle, suffix \\ []) @@ -167,10 +167,7 @@ defmodule Electric.ShapeCache.PureFileStorage do defp shape_snapshot_dir(opts), do: shape_data_dir(opts, [@snapshot_dir]) defp shape_snapshot_path(opts, filename), do: shape_data_dir(opts, [@snapshot_dir, filename]) - defp tmp_dir(%__MODULE__{} = opts) do - {__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id) - stack_opts.tmp_dir - end + defp tmp_dir(%__MODULE__{} = opts), do: Storage.opt_for_stack(opts.stack_id, :tmp_dir) def stack_start_link(opts) do Supervisor.start_link( @@ -537,8 +534,9 @@ defmodule Electric.ShapeCache.PureFileStorage do ) if shape_definition.storage.compaction == :enabled do - {__MODULE__, stack_opts} = Storage.for_stack(shape_opts.stack_id) - schedule_compaction(stack_opts.compaction_config) + shape_opts.stack_id + |> Storage.opt_for_stack(:compaction_config) + |> schedule_compaction() end writer_state( @@ -1306,6 +1304,44 @@ defmodule Electric.ShapeCache.PureFileStorage do end end + @doc """ + Append log items from a transaction fragment. + + Unlike `append_to_log!/2`, this does NOT advance `last_seen_txn_offset` or + call `register_complete_txn`. Transaction completion should be signaled + separately via `signal_txn_commit!/2`. + + This ensures that on crash/recovery, `fetch_latest_offset` returns the + last committed transaction offset, not a mid-transaction offset. + """ + def append_fragment_to_log!(fragment_lines, writer_state(writer_acc: acc) = state) do + fragment_lines + |> normalize_log_stream() + |> WriteLoop.append_fragment_to_log!(acc, state) + |> case do + {acc, cancel_flush_timer: true} -> + timer_ref = writer_state(state, :write_timer) + if not is_nil(timer_ref), do: Process.cancel_timer(timer_ref) + writer_state(state, writer_acc: acc, write_timer: nil) + + {acc, schedule_flush: times_flushed} -> + writer_state(state, writer_acc: acc) + |> schedule_flush(times_flushed) + end + end + + @doc """ + Signal that a transaction has committed. + + Updates `last_seen_txn_offset` and persists metadata to mark the transaction + as complete. Should be called after all fragments have been written via + `append_fragment_to_log!/2`. + """ + def signal_txn_commit!(_xid, writer_state(writer_acc: acc) = state) do + acc = WriteLoop.signal_txn_commit(acc, state) + writer_state(state, writer_acc: acc) + end + def update_chunk_boundaries_cache(opts, boundaries) do :ets.update_element( opts.stack_ets, @@ -1320,13 +1356,13 @@ defmodule Electric.ShapeCache.PureFileStorage do if WriteLoop.has_flushed_since?(acc, old) or is_nil(timer) do if not is_nil(timer), do: Process.cancel_timer(timer) - {__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id) + flush_period = Storage.opt_for_stack(opts.stack_id, :flush_period) ref = Process.send_after( self(), {Storage, {__MODULE__, :perform_scheduled_flush, [WriteLoop.times_flushed(acc)]}}, - stack_opts.flush_period + flush_period ) writer_state(state, write_timer: ref) diff --git a/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex b/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex index d80c88f4ac..ef93ff31e4 100644 --- a/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex +++ b/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex @@ -128,6 +128,55 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do end end + @doc """ + Append log lines from a transaction fragment. + + Unlike `append_to_log!/3`, this does NOT advance `last_seen_txn_offset` or + call `register_complete_txn`. Transaction completion should be signaled + separately via `register_complete_txn/2` after the commit is received. + + This ensures that on crash/recovery, `fetch_latest_offset` returns the + last committed transaction offset, not a mid-transaction offset. + """ + def append_fragment_to_log!( + fragment_lines, + writer_acc(times_flushed: times_flushed) = acc, + state + ) do + acc = ensure_json_file_open(acc, state) + + fragment_lines + |> Enum.reduce(acc, fn + {offset, _, _, _, _, _, _}, writer_acc(last_seen_txn_offset: min_offset) = acc + when is_log_offset_lte(offset, min_offset) -> + # Line already persisted, no-op + acc + + {offset, _, _, _, _, _, _} = line, acc -> + acc + |> maybe_write_opening_chunk_boundary(state, offset) + |> add_to_buffer(line) + |> maybe_write_closing_chunk_boundary(state) + |> maybe_flush_buffer(state) + end) + |> case do + # If the buffer has been fully flushed, no need to schedule more flushes + # Note: We do NOT update last_seen_txn_offset or call register_complete_txn + # since the transaction is not yet complete + writer_acc(buffer_size: 0) = acc -> + acc = close_chunk_file(acc) + {acc, cancel_flush_timer: true} + + acc -> + acc = + acc + |> store_lines_in_ets(state) + |> close_chunk_file() + + {acc, schedule_flush: times_flushed} + end + end + ### Working with the buffer defp add_to_buffer( @@ -374,4 +423,15 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do end |> update_persistance_metadata(state, prev_persisted_txn) end + + @doc """ + Signal that a transaction has been committed. + + This updates `last_seen_txn_offset` and potentially `last_persisted_txn_offset` + to mark the transaction as complete. Should be called after all fragments + have been written via `append_fragment_to_log!/3`. + """ + def signal_txn_commit(acc, state) do + register_complete_txn(acc, state) + end end From c0ae18a0cea8491fda4b1db466cc15d8d5ccf069 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 14:23:25 +0100 Subject: [PATCH 07/19] WIP Add fragment awareness to test storage --- packages/sync-service/test/support/test_storage.ex | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/packages/sync-service/test/support/test_storage.ex b/packages/sync-service/test/support/test_storage.ex index 206f6c8db2..a023adcd97 100644 --- a/packages/sync-service/test/support/test_storage.ex +++ b/packages/sync-service/test/support/test_storage.ex @@ -150,6 +150,20 @@ defmodule Support.TestStorage do {parent, shape_handle, data, storage} end + @impl Electric.ShapeCache.Storage + def append_fragment_to_log!(log_items, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :append_fragment_to_log!, shape_handle, log_items}) + storage = Storage.append_fragment_to_log!(log_items, storage) + {parent, shape_handle, data, storage} + end + + @impl Electric.ShapeCache.Storage + def signal_txn_commit!(xid, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :signal_txn_commit!, shape_handle, xid}) + storage = Storage.signal_txn_commit!(xid, storage) + {parent, shape_handle, data, storage} + end + @impl Electric.ShapeCache.Storage def append_move_in_snapshot_to_log!(name, {parent, shape_handle, data, storage}) do send(parent, {__MODULE__, :append_move_in_snapshot_to_log!, shape_handle, name}) From 5c4968a134fcb317f831e34a6cfe72bc531e98ce Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 14:25:27 +0100 Subject: [PATCH 08/19] WIP Txn fragments for inmemorystorage which is used in consumer tests --- .../electric/shape_cache/in_memory_storage.ex | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index 70d1377279..f9b88451c3 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -318,6 +318,58 @@ defmodule Electric.ShapeCache.InMemoryStorage do opts end + @impl Electric.ShapeCache.Storage + def append_fragment_to_log!(log_items, %MS{} = opts) do + # For in-memory storage, fragment writes are similar to full transaction writes + # but we do NOT update @latest_offset_key until commit, and we do NOT send + # the :flushed message. This ensures fetch_latest_offset returns the last + # committed transaction offset, not a mid-transaction offset. + log_table = opts.log_table + chunk_checkpoint_table = opts.chunk_checkpoint_table + + {processed_log_items, _last_offset} = + Enum.map_reduce(log_items, nil, fn + {:chunk_boundary, offset}, curr -> + {{storage_offset(offset), :checkpoint}, curr} + + {offset, _key, _op_type, json_log_item}, _ -> + {{{:offset, storage_offset(offset)}, json_log_item}, offset} + end) + + processed_log_items + |> Enum.split_with(fn item -> match?({_, :checkpoint}, item) end) + |> then(fn {checkpoints, log_items} -> + :ets.insert(chunk_checkpoint_table, checkpoints) + :ets.insert(log_table, log_items) + # Note: NOT updating @latest_offset_key until signal_txn_commit! + end) + + opts + end + + @impl Electric.ShapeCache.Storage + def signal_txn_commit!(_xid, %MS{} = opts) do + # For in-memory storage, the commit signal updates @latest_offset_key + # to the last written offset and sends the :flushed message + log_table = opts.log_table + + # Find the last written offset in the log table + case :ets.last(log_table) do + :"$end_of_table" -> + opts + + {:offset, offset_tuple} -> + last_offset = LogOffset.new(offset_tuple) + :ets.insert(opts.snapshot_table, {@latest_offset_key, last_offset}) + send(self(), {Storage, :flushed, last_offset}) + opts + + _ -> + # Other keys (like :movein) - no update needed + opts + end + end + @impl Electric.ShapeCache.Storage def write_move_in_snapshot!(stream, name, %MS{log_table: log_table}) do stream From 5d6514539bfb545c6a03f1d1f9a3a94c0e38221e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 14:42:58 +0100 Subject: [PATCH 09/19] Forgot that a txn fragment with has_begin? has a list of changes --- .../lib/electric/shapes/consumer.ex | 80 ++++++++++--------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 94cc2b1142..3c3e1c3d23 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -486,58 +486,60 @@ defmodule Electric.Shapes.Consumer do %State{} = state ) when needs_initial_filtering(state) do - case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, xid) do - {:consider_flushed, initial_snapshot_state} -> - # This transaction is already included in the snapshot, so just ignore all of its fragments. - # TODO: it could be the case the multiple txns are seen while InitialSnapshot needs - # filtering. And the outcomes for those txns could be different. - %{ - state - | pending_txn: PendingTxn.consider_flushed(state.pending_txn), - initial_snapshot_state: initial_snapshot_state - } + state = + case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, xid) do + {:consider_flushed, initial_snapshot_state} -> + # This transaction is already included in the snapshot, so just ignore all of its fragments. + # TODO: it could be the case the multiple txns are seen while InitialSnapshot needs + # filtering. And the outcomes for those txns could be different. + %{ + state + | pending_txn: PendingTxn.consider_flushed(state.pending_txn), + initial_snapshot_state: initial_snapshot_state + } - {:continue, new_initial_snapshot_state} -> - # The transaction is not part of the initial snapshot. - state = %{state | initial_snapshot_state: new_initial_snapshot_state} - process_txn_fragment(txn_fragment, state) - end + {:continue, new_initial_snapshot_state} -> + # The transaction is not part of the initial snapshot. + %{state | initial_snapshot_state: new_initial_snapshot_state} + end + + process_txn_fragment(txn_fragment, state) end - defp handle_txn_fragment( + defp handle_txn_fragment(txn_fragment, state), do: process_txn_fragment(txn_fragment, state) + + # Fragments belonging to the same transaction can all be skipped either via xid-filtering or log offset filtering. + defp process_txn_fragment( %TransactionFragment{last_log_offset: last_log_offset} = txn_fragment, %State{pending_txn: txn} = state ) do - if txn.consider_flushed? or fragment_already_processed?(txn_fragment, state) do + cond do # TODO: Do not mark the fragment itself flushed. When the cnnection restarts, we want to # receive all fragments starting from the one with has_begin? so as not to overcomplicate # the consumer logic. - consider_flushed(state, last_log_offset) - else - process_txn_fragment(txn_fragment, state) - end - end + txn.consider_flushed? or fragment_already_processed?(txn_fragment, state) -> + consider_flushed(state, last_log_offset) - # In the regular mode all fragments are buffered until the Commit change is seen. At that - # point, a transaction struct is produced from the buffered fragments and is written to - # storage. - defp process_txn_fragment(txn_fragment, %State{fragment_direct?: false} = state) do - {txns, transaction_builder} = - TransactionBuilder.build(txn_fragment, state.transaction_builder) + # In the regular mode all fragments are buffered until the Commit change is seen. At that + # point, a transaction struct is produced from the buffered fragments and is written to + # storage. + not state.fragment_direct? -> + {txns, transaction_builder} = + TransactionBuilder.build(txn_fragment, state.transaction_builder) - state = %{state | transaction_builder: transaction_builder} + state = %{state | transaction_builder: transaction_builder} - case txns do - [] -> state - [txn] -> handle_txn(txn, state) - end - end + case txns do + [] -> state + [txn] -> handle_txn(txn, state) + end - # In the fragment-direct mode, each transaction fragment is written to storage individually. - defp process_txn_fragment(txn_fragment, state) do - state - |> write_txn_fragment_to_storage(txn_fragment) - |> maybe_complete_pending_txn(txn_fragment) + # In the fragment-direct mode, each transaction fragment is written to storage individually. + true -> + state + |> write_txn_fragment_to_storage(txn_fragment) + |> maybe_complete_pending_txn(txn_fragment) + end end # This function does similar things to do_handle_txn/2 but with the following simplifications: From 75b68a8d56b89eefe7e37f84c50b6c3b6fa6fe8d Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 15:00:37 +0100 Subject: [PATCH 10/19] Do not forget to reset state.pending_txn when skipping the whole transaction by Xid --- .../lib/electric/shapes/consumer.ex | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 3c3e1c3d23..1b390854e1 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -510,7 +510,7 @@ defmodule Electric.Shapes.Consumer do # Fragments belonging to the same transaction can all be skipped either via xid-filtering or log offset filtering. defp process_txn_fragment( - %TransactionFragment{last_log_offset: last_log_offset} = txn_fragment, + %TransactionFragment{} = txn_fragment, %State{pending_txn: txn} = state ) do cond do @@ -518,7 +518,7 @@ defmodule Electric.Shapes.Consumer do # receive all fragments starting from the one with has_begin? so as not to overcomplicate # the consumer logic. txn.consider_flushed? or fragment_already_processed?(txn_fragment, state) -> - consider_flushed(state, last_log_offset) + skip_txn_fragment(state, txn_fragment) # In the regular mode all fragments are buffered until the Commit change is seen. At that # point, a transaction struct is produced from the buffered fragments and is written to @@ -635,6 +635,14 @@ defmodule Electric.Shapes.Consumer do [%{head | last?: true} | tail] end + defp skip_txn_fragment(state, %TransactionFragment{commit: nil}), do: state + + defp skip_txn_fragment(state, %TransactionFragment{} = txn_fragment) do + # Reset the pending_txn field to be ready for the first txn fragment of the next incoming transaction. + %{state | pending_txn: nil} + |> consider_flushed(txn_fragment.last_log_offset) + end + defp maybe_complete_pending_txn(state, %TransactionFragment{commit: nil}), do: state defp maybe_complete_pending_txn(%{terminating?: true} = state, _fragment) do @@ -699,8 +707,16 @@ defmodule Electric.Shapes.Consumer do def process_buffered_txn_fragments(%State{buffer: buffer} = state) do Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transaction fragments" end) {txn_fragments, state} = State.pop_buffered(state) - # TODO: verify the "while" condition here, when will it terminate early? - Enum.reduce_while(txn_fragments, state, &handle_txn_fragment/2) + + Enum.reduce_while(txn_fragments, state, fn txn_fragment, state -> + state = handle_txn_fragment(txn_fragment, state) + + if state.terminating? do + {:halt, state} + else + {:cont, state} + end + end) end defp handle_txn(txn, %State{} = state) do From e4afc93ee26991a4fde06763a74fc7666c4ec2d6 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 15:17:53 +0100 Subject: [PATCH 11/19] WIP don't forget to reset pending_txn when fragment_direct?=false --- packages/sync-service/lib/electric/shapes/consumer.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 1b390854e1..1065ee1e14 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -529,6 +529,8 @@ defmodule Electric.Shapes.Consumer do state = %{state | transaction_builder: transaction_builder} + # TODO: need to reset pending_txn in this code path + case txns do [] -> state [txn] -> handle_txn(txn, state) From a67d863f238913d3ea64857e8af7757a903d20f5 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 15:19:47 +0100 Subject: [PATCH 12/19] Remove continuation tags from handle_txn since it's no longer called inside Enum.reduce_while() --- .../lib/electric/shapes/consumer.ex | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 1065ee1e14..c7b417a8a9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -760,14 +760,14 @@ defmodule Electric.Shapes.Consumer do %{xid: xid, extra_refs: {extra_refs_before_move_ins, extra_refs_full}} ) do :includes_truncate -> - {:halt, handle_txn_with_truncate(txn.xid, state)} + handle_txn_with_truncate(txn.xid, state) {_, state, 0, _} -> Logger.debug(fn -> "No relevant changes found for #{inspect(shape)} in txn #{txn.xid}" end) - {:cont, consider_flushed(state, txn.last_log_offset)} + consider_flushed(state, txn.last_log_offset) {changes, state, num_changes, last_log_offset} -> timestamp = System.monotonic_time() @@ -797,14 +797,13 @@ defmodule Electric.Shapes.Consumer do Map.new(shape_attrs(state.shape_handle, state.shape)) ) - {:cont, - %{ - state - | writer: writer, - latest_offset: last_log_offset, - txn_offset_mapping: - state.txn_offset_mapping ++ [{last_log_offset, txn.last_log_offset}] - }} + %{ + state + | writer: writer, + latest_offset: last_log_offset, + txn_offset_mapping: + state.txn_offset_mapping ++ [{last_log_offset, txn.last_log_offset}] + } end end From a1d8cb779bb426e4712c72d7c34aef4b3eec4682 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 27 Jan 2026 16:09:33 +0100 Subject: [PATCH 13/19] Pass persisted fragment changes to the pending txn completion function This latter function needs to see converted changes to correectly notify subscribed materializers --- .../lib/electric/shapes/consumer.ex | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index c7b417a8a9..0447e4d8c4 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -538,9 +538,8 @@ defmodule Electric.Shapes.Consumer do # In the fragment-direct mode, each transaction fragment is written to storage individually. true -> - state - |> write_txn_fragment_to_storage(txn_fragment) - |> maybe_complete_pending_txn(txn_fragment) + {state, persisted_changes} = write_txn_fragment_to_storage(state, txn_fragment) + maybe_complete_pending_txn(state, txn_fragment, persisted_changes) end end @@ -548,7 +547,7 @@ defmodule Electric.Shapes.Consumer do # - it doesn't account for move-ins or move-outs or converting update operations into insert/delete # - the fragment is written directly to storage if it has changes matching this shape # - if the fragment has a commit message, the ShapeLogCollector is informed about the new flush boundary - defp write_txn_fragment_to_storage(state, %TransactionFragment{changes: []}), do: state + defp write_txn_fragment_to_storage(state, %TransactionFragment{changes: []}), do: {state, []} defp write_txn_fragment_to_storage( state, @@ -564,7 +563,7 @@ defmodule Electric.Shapes.Consumer do case convert_fragment_changes(changes, stack_id, shape_handle, shape) do :includes_truncate -> - handle_txn_with_truncate(xid, state) + {handle_txn_with_truncate(xid, state), []} {[], 0} -> Logger.debug(fn -> @@ -572,7 +571,7 @@ defmodule Electric.Shapes.Consumer do end) # TODO: this needs to signal something to the subsequent maybe_complete_pending_txn call - consider_flushed(state, fragment.last_log_offset) + {consider_flushed(state, fragment.last_log_offset), []} {reversed_changes, num_changes, last_log_offset} -> converted_changes = @@ -589,14 +588,14 @@ defmodule Electric.Shapes.Consumer do "Wrote #{num_changes} changes for fragment xid=#{xid}, total_bytes=#{total_size}" end) - %{ - state - | writer: writer, - latest_offset: last_log_offset, - pending_txn: txn, - txn_offset_mapping: - state.txn_offset_mapping ++ [{last_log_offset, fragment.last_log_offset}] - } + {%{ + state + | writer: writer, + latest_offset: last_log_offset, + pending_txn: txn, + txn_offset_mapping: + state.txn_offset_mapping ++ [{last_log_offset, fragment.last_log_offset}] + }, converted_changes} end end @@ -645,23 +644,28 @@ defmodule Electric.Shapes.Consumer do |> consider_flushed(txn_fragment.last_log_offset) end - defp maybe_complete_pending_txn(state, %TransactionFragment{commit: nil}), do: state + defp maybe_complete_pending_txn(%State{} = state, %TransactionFragment{commit: nil}, _changes), + do: state - defp maybe_complete_pending_txn(%{terminating?: true} = state, _fragment) do + defp maybe_complete_pending_txn(%State{terminating?: true} = state, _fragment, _changes) do # If we're terminating (e.g., due to truncate), don't complete the transaction state end - defp maybe_complete_pending_txn(state, %TransactionFragment{commit: commit} = fragment) do + defp maybe_complete_pending_txn( + %State{} = state, + %TransactionFragment{commit: commit} = txn_fragment, + persisted_changes + ) do %{pending_txn: txn, writer: writer, shape: shape, shape_handle: shape_handle} = state # Signal commit to storage for potential chunk boundary calculation writer = ShapeCache.Storage.signal_txn_commit!(txn.xid, writer) # Only notify if we actually wrote changes - if txn.num_changes > 0 do + if persisted_changes != [] do # TODO: Only notify upon writing the full txn to storage - notify_new_changes(state, fragment.changes, txn.last_log_offset) + notify_new_changes(state, persisted_changes, txn.last_log_offset) lag = calculate_replication_lag_from_commit(commit) @@ -686,16 +690,19 @@ defmodule Electric.Shapes.Consumer do latest_offset: txn.last_log_offset, pending_txn: nil, txn_offset_mapping: - state.txn_offset_mapping ++ [{txn.last_log_offset, fragment.last_log_offset}] + state.txn_offset_mapping ++ [{txn.last_log_offset, txn_fragment.last_log_offset}] } else + # TODO: is this even reachable??? + # No changes were written - notify flush boundary like consider_flushed does Logger.debug(fn -> "No relevant changes in fragment-direct transaction xid=#{txn.xid}" end) state = %{state | writer: writer, pending_txn: nil} - consider_flushed(state, fragment.last_log_offset) + # OR pending_txn.last_log_offset ??? + consider_flushed(state, txn_fragment.last_log_offset) end end From dc4bb9b0bd0359275eb6d073ea9369d0df32a626 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 12 Jan 2026 23:53:41 +0100 Subject: [PATCH 14/19] Remove unused CrashingFileStorage module and related config --- packages/sync-service/config/runtime.exs | 7 -- .../shape_cache/crashing_file_storage.ex | 76 ------------------- 2 files changed, 83 deletions(-) delete mode 100644 packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 7f5437de42..7c26bcd5fe 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -129,13 +129,6 @@ storage_spec = "fast_file" -> {Electric.ShapeCache.PureFileStorage, storage_dir: shape_path} - "crashing_file" -> - num_calls_until_crash = - env!("CRASHING_FILE_ELECTRIC_STORAGE__NUM_CALLS_UNTIL_CRASH", :integer) - - {Electric.ShapeCache.CrashingFileStorage, - storage_dir: shape_path, num_calls_until_crash: num_calls_until_crash} - _ -> raise Dotenvy.Error, message: "storage must be one of: MEMORY, FAST_FILE, LEGACY_FILE" end diff --git a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex deleted file mode 100644 index d0dcb9bc57..0000000000 --- a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex +++ /dev/null @@ -1,76 +0,0 @@ -defmodule Electric.ShapeCache.CrashingFileStorage do - @moduledoc """ - A thing wrapper module around PureFileStorage that can be configured to raise an error after a - certain number of writes. - """ - alias Electric.ShapeCache.PureFileStorage - - @behaviour Electric.ShapeCache.Storage - - defdelegate for_shape(shape_handle, opts), to: PureFileStorage - defdelegate start_link(opts), to: PureFileStorage - defdelegate get_all_stored_shape_handles(opts), to: PureFileStorage - defdelegate get_stored_shapes(opts, shape_handles), to: PureFileStorage - defdelegate metadata_backup_dir(opts), to: PureFileStorage - defdelegate get_total_disk_usage(opts), to: PureFileStorage - defdelegate fetch_latest_offset(opts), to: PureFileStorage - defdelegate fetch_pg_snapshot(opts), to: PureFileStorage - defdelegate set_pg_snapshot(pg_snapshot, opts), to: PureFileStorage - defdelegate snapshot_started?(opts), to: PureFileStorage - defdelegate make_new_snapshot!(data_stream, opts), to: PureFileStorage - defdelegate mark_snapshot_as_started(opts), to: PureFileStorage - defdelegate get_log_stream(offset, max_offset, opts), to: PureFileStorage - defdelegate get_chunk_end_log_offset(offset, opts), to: PureFileStorage - defdelegate cleanup!(opts), to: PureFileStorage - defdelegate cleanup!(opts, shape_handle), to: PureFileStorage - defdelegate cleanup_all!(opts), to: PureFileStorage - defdelegate terminate(opts), to: PureFileStorage - defdelegate hibernate(opts), to: PureFileStorage - defdelegate compact(opts, keep_complete_chunks), to: PureFileStorage - defdelegate append_move_in_snapshot_to_log!(name, writer_state), to: PureFileStorage - - defdelegate append_move_in_snapshot_to_log_filtered!( - name, - writer_state, - touch_tracker, - snapshot, - tags_to_skip - ), - to: PureFileStorage - - defdelegate append_control_message!(control_message, writer_state), to: PureFileStorage - defdelegate write_move_in_snapshot!(stream, name, opts), to: PureFileStorage - - defp stack_agent_name(opts) do - Electric.ProcessRegistry.name(opts, __MODULE__, :agent) - end - - def stack_start_link(opts) do - {:ok, _agent} = Agent.start_link(fn -> 0 end, name: stack_agent_name(opts)) - PureFileStorage.stack_start_link(opts) - end - - def shared_opts(opts) do - opts - |> PureFileStorage.shared_opts() - |> Map.put(:extra_opts, %{num_calls_until_crash: Keyword.fetch!(opts, :num_calls_until_crash)}) - end - - def init_writer!(opts, shape_definition) do - Agent.update(stack_agent_name(opts), fn _ -> opts.extra_opts.num_calls_until_crash end) - PureFileStorage.init_writer!(opts, shape_definition) - end - - def append_to_log!(log_items, opts) do - num_calls_until_crash = Agent.get(stack_agent_name(opts), & &1) - - if num_calls_until_crash == 0 do - Agent.update(stack_agent_name(opts), fn _ -> opts.extra_opts.num_calls_until_crash end) - raise "Simulated storage failure" - end - - Agent.update(stack_agent_name(opts), fn n -> n - 1 end) - - PureFileStorage.append_to_log!(log_items, opts) - end -end From 376fbd49a54cb493327f024e875a87a43f445ce4 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 28 Jan 2026 00:45:59 +0100 Subject: [PATCH 15/19] Reset pending_txn when a full transaction is assembled in shape with deps --- .../lib/electric/shapes/consumer.ex | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 0447e4d8c4..ada90880ad 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -466,9 +466,8 @@ defmodule Electric.Shapes.Consumer do State.add_to_buffer(state, txn_fragment) end - # The first fragment of a new transaction: initialize pending txn metadata and check if the - # transaction xid is already part of the snapshot. If it is, all subsequent fragments of this - # transaction will be ignored. + # pending_txn struct is initialized to keep track of all fragments comprising this txn and + # store the "consider_flushed" state on it. defp handle_txn_fragment( %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, %State{pending_txn: nil} = state @@ -478,9 +477,12 @@ defmodule Electric.Shapes.Consumer do handle_txn_fragment(txn_fragment, state) end - # We can use the initial filtering to our advantage here and avoid accumulating fragments for - # a transaction that is going to be skipped anyway. This works both in the regular mode and - # in the fragment-direct mode. + # Upon seeing the first fragment of a new transaction, check if its xid is already included in the + # initial snapshot. If it is, all subsequent fragments of this transaction will be ignored. + # + # Initial filtering is giving us the advantage of not accumulating fragments for a + # transaction that is going to be skipped anyway. This works both in the regular mode and in + # the fragment-direct mode. defp handle_txn_fragment( %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, %State{} = state @@ -489,18 +491,17 @@ defmodule Electric.Shapes.Consumer do state = case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, xid) do {:consider_flushed, initial_snapshot_state} -> - # This transaction is already included in the snapshot, so just ignore all of its fragments. - # TODO: it could be the case the multiple txns are seen while InitialSnapshot needs - # filtering. And the outcomes for those txns could be different. + # This transaction is already included in the snapshot, so mark it as flushed to + # ignore any of its follow-up fragments. %{ state | pending_txn: PendingTxn.consider_flushed(state.pending_txn), initial_snapshot_state: initial_snapshot_state } - {:continue, new_initial_snapshot_state} -> + {:continue, initial_snapshot_state} -> # The transaction is not part of the initial snapshot. - %{state | initial_snapshot_state: new_initial_snapshot_state} + %{state | initial_snapshot_state: initial_snapshot_state} end process_txn_fragment(txn_fragment, state) @@ -529,11 +530,12 @@ defmodule Electric.Shapes.Consumer do state = %{state | transaction_builder: transaction_builder} - # TODO: need to reset pending_txn in this code path - case txns do - [] -> state - [txn] -> handle_txn(txn, state) + [] -> + state + + [txn] -> + handle_txn(txn, %{state | pending_txn: nil}) end # In the fragment-direct mode, each transaction fragment is written to storage individually. From 26b0c7cbc75054527bc32c90bcdd74dce84fcb71 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 28 Jan 2026 00:46:26 +0100 Subject: [PATCH 16/19] Log txn fragment and txn reception separately in Consumer --- integration-tests/tests/crash-recovery.lux | 2 +- packages/sync-service/lib/electric/shapes/consumer.ex | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/tests/crash-recovery.lux b/integration-tests/tests/crash-recovery.lux index 535815ff55..69666097bf 100644 --- a/integration-tests/tests/crash-recovery.lux +++ b/integration-tests/tests/crash-recovery.lux @@ -166,7 +166,7 @@ # slower than the pure-ets version (~50ms vs ~5ms) which means that this request # arrives before the stack has received the pending operation. # To mitigate this we ensure the txn has been processed. - ??[debug] Txn received in Shapes.Consumer + ??[debug] Txn fragment received in Shapes.Consumer # Client should be able to continue same shape [shell client] diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index ada90880ad..c53e4a45d5 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -450,6 +450,7 @@ defmodule Electric.Shapes.Consumer do end defp handle_event(%TransactionFragment{} = txn_fragment, state) do + Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) handle_txn_fragment(txn_fragment, state) end @@ -535,6 +536,7 @@ defmodule Electric.Shapes.Consumer do state [txn] -> + Logger.debug(fn -> "Txn assembled in Shapes.Consumer: #{inspect(txn)}" end) handle_txn(txn, %{state | pending_txn: nil}) end @@ -748,8 +750,6 @@ defmodule Electric.Shapes.Consumer do state = State.remove_completed_move_ins(state, txn) - Logger.debug(fn -> "Txn received in Shapes.Consumer: #{inspect(txn)}" end) - extra_refs_full = Materializer.get_all_as_refs(shape, state.stack_id) From bfa3a69ab8d7ca406dbd806efb884edf9d054bb3 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 28 Jan 2026 12:10:07 +0100 Subject: [PATCH 17/19] Short-circuit handling of txn fragments that contain entire transactions --- .../lib/electric/shapes/consumer.ex | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index c53e4a45d5..2541a0b9af 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -36,6 +36,10 @@ defmodule Electric.Shapes.Consumer do @stop_and_clean_timeout 30_000 @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() + defguardp txn_fragment_has_entire_txn?(txn_fragment) + when is_struct(txn_fragment, TransactionFragment) and txn_fragment.has_begin? and + not is_nil(txn_fragment.commit) + def name(stack_id, shape_handle) when is_binary(shape_handle) do ConsumerRegistry.name(stack_id, shape_handle) end @@ -467,6 +471,28 @@ defmodule Electric.Shapes.Consumer do State.add_to_buffer(state, txn_fragment) end + # Short-circuit clauses for the most common case of a single-fragment transaction + defp handle_txn_fragment(%TransactionFragment{} = txn_fragment, state) + when txn_fragment_has_entire_txn?(txn_fragment) and needs_initial_filtering(state) do + case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, txn_fragment.xid) do + {:consider_flushed, initial_snapshot_state} -> + # This transaction is already included in the snapshot, flush it immediately and skip + # writing it to the shape log. + state = %{state | initial_snapshot_state: initial_snapshot_state} + consider_flushed(state, txn_fragment.last_log_offset) + + {:continue, initial_snapshot_state} -> + # The transaction is not part of the initial snapshot. + state = %{state | initial_snapshot_state: initial_snapshot_state} + build_and_handle_txn(txn_fragment, state) + end + end + + defp handle_txn_fragment(%TransactionFragment{} = txn_fragment, state) + when txn_fragment_has_entire_txn?(txn_fragment) do + build_and_handle_txn(txn_fragment, state) + end + # pending_txn struct is initialized to keep track of all fragments comprising this txn and # store the "consider_flushed" state on it. defp handle_txn_fragment( @@ -510,12 +536,12 @@ defmodule Electric.Shapes.Consumer do defp handle_txn_fragment(txn_fragment, state), do: process_txn_fragment(txn_fragment, state) - # Fragments belonging to the same transaction can all be skipped either via xid-filtering or log offset filtering. defp process_txn_fragment( %TransactionFragment{} = txn_fragment, %State{pending_txn: txn} = state ) do cond do + # Fragments belonging to the same transaction can all be skipped either via xid-filtering or log offset filtering. # TODO: Do not mark the fragment itself flushed. When the cnnection restarts, we want to # receive all fragments starting from the one with has_begin? so as not to overcomplicate # the consumer logic. @@ -540,8 +566,8 @@ defmodule Electric.Shapes.Consumer do handle_txn(txn, %{state | pending_txn: nil}) end - # In the fragment-direct mode, each transaction fragment is written to storage individually. true -> + # If we've ended up in this branch, we know for sure that the current fragment is one of a few for the transaction. {state, persisted_changes} = write_txn_fragment_to_storage(state, txn_fragment) maybe_complete_pending_txn(state, txn_fragment, persisted_changes) end @@ -732,6 +758,11 @@ defmodule Electric.Shapes.Consumer do end) end + defp build_and_handle_txn(%TransactionFragment{} = txn_fragment, %State{} = state) do + {[txn], _} = TransactionBuilder.build(txn_fragment, TransactionBuilder.new()) + handle_txn(txn, state) + end + defp handle_txn(txn, %State{} = state) do ot_attrs = [xid: txn.xid, total_num_changes: txn.num_changes] ++ From 39f76f6da84dc18f2e51e2efaedf6b91591ed8fa Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 28 Jan 2026 12:14:30 +0100 Subject: [PATCH 18/19] Update consumer tests to cover fragment-aware storage function calls --- .../test/electric/shapes/consumer_test.exs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 81146562a1..531979a695 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -209,7 +209,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {^ref, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} + refute_storage_calls_for_txn_fragment(@shape_handle2) txn2 = transaction(xid, next_lsn, [ @@ -223,7 +223,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) assert_receive {^ref, :new_changes, ^next_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} + refute_storage_calls_for_txn_fragment(@shape_handle2) end test "correctly writes only relevant changes to multiple shape logs", ctx do @@ -303,7 +303,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} + refute_storage_calls_for_txn_fragment(@shape_handle1) refute_receive {^ref1, :new_changes, _} assert_receive {^ref2, :new_changes, _} @@ -384,7 +384,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) end) - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} + refute_storage_calls_for_txn_fragment(@shape_handle1) assert_shape_cleanup(@shape_handle1) @@ -1120,4 +1120,10 @@ defmodule Electric.Shapes.ConsumerTest do affected_relations: MapSet.new(changes, & &1.relation) } end + + defp refute_storage_calls_for_txn_fragment(shape_handle) do + refute_receive {Support.TestStorage, :append_to_log!, ^shape_handle, _} + refute_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, _} + refute_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, _} + end end From 1c882c2642d90c88fcc9f65cbf51a6f5968d9c90 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 28 Jan 2026 12:14:43 +0100 Subject: [PATCH 19/19] wip assert_storage_calls_for_txn_fragment --- .../sync-service/test/electric/shapes/consumer_test.exs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 531979a695..a4b3411be7 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -1121,6 +1121,13 @@ defmodule Electric.Shapes.ConsumerTest do } end + defp assert_storage_calls_for_txn_fragment(shape_handle, log_offset, txn_xid, txn_op) do + assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, + [{^log_offset, _key, ^txn_op, _json}]} + + assert_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, ^txn_xid} + end + defp refute_storage_calls_for_txn_fragment(shape_handle) do refute_receive {Support.TestStorage, :append_to_log!, ^shape_handle, _} refute_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, _}