Skip to content
5 changes: 5 additions & 0 deletions .changeset/brave-doors-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Fix stuck flush tracker when storage flush notification arrives mid-transaction in Consumer
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,14 @@ defmodule Electric.Replication.ShapeLogCollector do

OpenTelemetry.add_span_attributes("txn.is_dropped": true)

{:ok,
%{
state
| flush_tracker:
FlushTracker.handle_txn_fragment(
state.flush_tracker,
txn_fragment,
[],
MapSet.new()
)
}}
flush_tracker =
if txn_fragment.commit do
FlushTracker.handle_txn_fragment(state.flush_tracker, txn_fragment, [])
else
state.flush_tracker
end

{:ok, %{state | flush_tracker: flush_tracker}}
end

defp handle_txn_fragment(
Expand Down Expand Up @@ -578,22 +575,9 @@ defmodule Electric.Replication.ShapeLogCollector do

flush_tracker =
case event do
%TransactionFragment{} ->
shapes_with_changes =
for {id, frag} <- events_by_handle,
frag.change_count > 0,
not MapSet.member?(undeliverable_set, id),
do: id,
into: MapSet.new()

if event.commit, do: LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn)

FlushTracker.handle_txn_fragment(
flush_tracker,
event,
delivered_shapes,
shapes_with_changes
)
%TransactionFragment{commit: commit} when not is_nil(commit) ->
LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn)
FlushTracker.handle_txn_fragment(flush_tracker, event, delivered_shapes)

_ ->
flush_tracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,41 +95,15 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do
last_flushed == %{} and :gb_trees.is_empty(tree)
end

@spec handle_txn_fragment(
t(),
TransactionFragment.t(),
Enumerable.t(shape_id()),
MapSet.t(shape_id())
) :: t()

# Non-commit fragment: track affected shapes but don't update last_seen_offset
# or notify. This ensures shapes are registered early so flush notifications
# from Consumers aren't lost when storage flushes before the commit arrives.
def handle_txn_fragment(
%__MODULE__{} = state,
%TransactionFragment{commit: nil, last_log_offset: last_log_offset},
affected_shapes,
_shapes_with_changes
) do
track_shapes(state, last_log_offset, affected_shapes)
end
@spec handle_txn_fragment(t(), TransactionFragment.t(), Enumerable.t(shape_id())) :: t()

# Commit fragment: track shapes that have actual changes in this fragment
# or are already being tracked (need last_sent updated to commit offset).
# Skip shapes that only have a commit marker and already flushed from
# earlier non-commit fragments — there's nothing new to flush for them.
# Commit fragment: track all shapes affected by all fragments of the transaction and update last_seen_offset.
def handle_txn_fragment(
%__MODULE__{} = state,
%TransactionFragment{commit: %Commit{}, last_log_offset: last_log_offset},
affected_shapes,
shapes_with_changes
affected_shapes
) do
shapes_to_track =
Enum.filter(affected_shapes, fn shape ->
shape in shapes_with_changes or is_map_key(state.last_flushed, shape)
end)

state = track_shapes(state, last_log_offset, shapes_to_track)
state = track_shapes(state, last_log_offset, affected_shapes)

state = %{state | last_seen_offset: last_log_offset}

Expand Down Expand Up @@ -211,14 +185,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do
min_incomplete_flush_tree: min_incomplete_flush_tree
}

# Only update global offset if we've seen at least one commit.
# Before any commit, last_seen_offset is before_all and there's
# nothing meaningful to report.
if state.last_seen_offset == LogOffset.before_all() do
state
else
update_global_offset(state)
end
update_global_offset(state)
end

# If the shape is not in the mapping, then we're processing a flush notification for a shape that was removed
Expand Down
49 changes: 43 additions & 6 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,20 @@ defmodule Electric.Shapes.Consumer do
end
end

def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do
{state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in)
def handle_info({ShapeCache.Storage, :flushed, flushed_offset}, state) do
state =
if is_write_unit_txn(state.write_unit) or is_nil(state.pending_txn) do
# We're not currently in the middle of processing a transaction. This flushed offset is either
# from a previously processed transaction or a non-commit fragment of the most recently
# seen transaction. Notify ShapeLogCollector about it immediately.
confirm_flushed_and_notify(state, flushed_offset)
else
# Storage has signaled latest flushed offset in the middle of processing a multi-fragment
# transaction. Save it for later, to be handled when the commit fragment arrives.
updated_offset = more_recent_offset(state.pending_flush_offset, flushed_offset)
%{state | pending_flush_offset: updated_offset}
end

ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn)
{:noreply, state, state.hibernate_after}
end

Expand Down Expand Up @@ -567,7 +577,7 @@ defmodule Electric.Shapes.Consumer do
# With write_unit=txn all fragments are buffered until the Commit change is seen. At that
# point, a transaction struct is produced from the buffered fragments and is written to
# storage.
state.write_unit == State.write_unit_txn() ->
is_write_unit_txn(state.write_unit) ->
{txns, transaction_builder} =
TransactionBuilder.build(txn_fragment, state.transaction_builder)

Expand Down Expand Up @@ -597,6 +607,7 @@ defmodule Electric.Shapes.Consumer do
defp skip_txn_fragment(state, %TransactionFragment{} = txn_fragment) do
%{state | pending_txn: nil}
|> consider_flushed(txn_fragment.last_log_offset)
|> clear_pending_flush_offset()
end

# This function does similar things to do_handle_txn/2 but with the following simplifications:
Expand Down Expand Up @@ -747,9 +758,10 @@ defmodule Electric.Shapes.Consumer do
"No relevant changes written in transaction xid=#{txn.xid}"
end)

state = %{state | pending_txn: nil}
consider_flushed(state, txn_fragment.last_log_offset)
%{state | pending_txn: nil}
|> consider_flushed(txn_fragment.last_log_offset)
end
|> clear_pending_flush_offset()
end

def process_buffered_txn_fragments(%State{buffer: buffer} = state) do
Expand Down Expand Up @@ -1006,6 +1018,31 @@ defmodule Electric.Shapes.Consumer do
end
end

defp confirm_flushed_and_notify(state, flushed_offset) do
{state, txn_offset} = State.align_offset_to_txn_boundary(state, flushed_offset)
ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, txn_offset)
state
end

# After a pending transaction completes and txn_offset_mapping is populated,
# process the deferred flushed offset (if any).
#
# Even if the most recent transaction is skipped or no changes from it end up satisfying the
# shape's `where` condition, Storage may have signaled a flush offset from the previous transaction
# while we were still processing fragments of the current one. Therefore this function must
# be called any time `state.pending_txn` is reset to nil in a multi-fragment transaction
# processing setting.
defp clear_pending_flush_offset(%{pending_flush_offset: nil} = state), do: state

defp clear_pending_flush_offset(%{pending_flush_offset: flushed_offset} = state) do
%{state | pending_flush_offset: nil}
|> confirm_flushed_and_notify(flushed_offset)
end

defp more_recent_offset(nil, offset), do: offset
defp more_recent_offset(offset, nil), do: offset
defp more_recent_offset(offset1, offset2), do: LogOffset.max(offset1, offset2)

defp subscribe(state, action) do
case ShapeLogCollector.add_shape(state.stack_id, state.shape_handle, state.shape, action) do
:ok ->
Expand Down
10 changes: 7 additions & 3 deletions packages/sync-service/lib/electric/shapes/consumer/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ defmodule Electric.Shapes.Consumer.State do
# Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen.
# It is used to check whether the entire txn is visible in the snapshot and to mark it
# as flushed in order to handle its remaining fragments appropriately.
pending_txn: nil
pending_txn: nil,
# When a {Storage, :flushed, offset} message arrives during a pending
# transaction, we defer the notification and store the max flushed offset
# here. Multiple deferred notifications are collapsed into a single most recent offset.
pending_flush_offset: nil
]

@type pg_snapshot() :: SnapshotQuery.pg_snapshot()
Expand Down Expand Up @@ -393,6 +397,6 @@ defmodule Electric.Shapes.Consumer.State do
]
end

def write_unit_txn, do: @write_unit_txn
def write_unit_txn_fragment, do: @write_unit_txn_fragment
defguard is_write_unit_txn(write_unit) when write_unit == @write_unit_txn
defguard is_write_unit_txn_fragment(write_unit) when write_unit == @write_unit_txn_fragment
end
Loading
Loading