diff --git a/integration-tests/tests/crash-recovery.lux b/integration-tests/tests/crash-recovery.lux index 535815ff55..69666097bf 100644 --- a/integration-tests/tests/crash-recovery.lux +++ b/integration-tests/tests/crash-recovery.lux @@ -166,7 +166,7 @@ # slower than the pure-ets version (~50ms vs ~5ms) which means that this request # arrives before the stack has received the pending operation. # To mitigate this we ensure the txn has been processed. - ??[debug] Txn received in Shapes.Consumer + ??[debug] Txn fragment received in Shapes.Consumer # Client should be able to continue same shape [shell client] diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 7f5437de42..7c26bcd5fe 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -129,13 +129,6 @@ storage_spec = "fast_file" -> {Electric.ShapeCache.PureFileStorage, storage_dir: shape_path} - "crashing_file" -> - num_calls_until_crash = - env!("CRASHING_FILE_ELECTRIC_STORAGE__NUM_CALLS_UNTIL_CRASH", :integer) - - {Electric.ShapeCache.CrashingFileStorage, - storage_dir: shape_path, num_calls_until_crash: num_calls_until_crash} - _ -> raise Dotenvy.Error, message: "storage must be one of: MEMORY, FAST_FILE, LEGACY_FILE" end diff --git a/packages/sync-service/lib/electric/postgres/xid.ex b/packages/sync-service/lib/electric/postgres/xid.ex index d4b0558e1f..d764ba2e0c 100644 --- a/packages/sync-service/lib/electric/postgres/xid.ex +++ b/packages/sync-service/lib/electric/postgres/xid.ex @@ -269,7 +269,8 @@ defmodule Electric.Postgres.Xid do @type pg_snapshot() :: {anyxid, anyxid, [anyxid]} @doc """ - Check if a transaction is after the end of a snapshot - if it's xid is over xmax + Check if a transaction is in the future from the POV of the snapshot. In other words, if its + xid is >= xmax, its changes are definitely *not* visible and *won't become* visible in this snapshot. """ @spec after_snapshot?(anyxid, pg_snapshot()) :: boolean() def after_snapshot?(xid, {_, xmax, _}) when not is_lt(xid, xmax), do: true diff --git a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex deleted file mode 100644 index d0dcb9bc57..0000000000 --- a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex +++ /dev/null @@ -1,76 +0,0 @@ -defmodule Electric.ShapeCache.CrashingFileStorage do - @moduledoc """ - A thing wrapper module around PureFileStorage that can be configured to raise an error after a - certain number of writes. - """ - alias Electric.ShapeCache.PureFileStorage - - @behaviour Electric.ShapeCache.Storage - - defdelegate for_shape(shape_handle, opts), to: PureFileStorage - defdelegate start_link(opts), to: PureFileStorage - defdelegate get_all_stored_shape_handles(opts), to: PureFileStorage - defdelegate get_stored_shapes(opts, shape_handles), to: PureFileStorage - defdelegate metadata_backup_dir(opts), to: PureFileStorage - defdelegate get_total_disk_usage(opts), to: PureFileStorage - defdelegate fetch_latest_offset(opts), to: PureFileStorage - defdelegate fetch_pg_snapshot(opts), to: PureFileStorage - defdelegate set_pg_snapshot(pg_snapshot, opts), to: PureFileStorage - defdelegate snapshot_started?(opts), to: PureFileStorage - defdelegate make_new_snapshot!(data_stream, opts), to: PureFileStorage - defdelegate mark_snapshot_as_started(opts), to: PureFileStorage - defdelegate get_log_stream(offset, max_offset, opts), to: PureFileStorage - defdelegate get_chunk_end_log_offset(offset, opts), to: PureFileStorage - defdelegate cleanup!(opts), to: PureFileStorage - defdelegate cleanup!(opts, shape_handle), to: PureFileStorage - defdelegate cleanup_all!(opts), to: PureFileStorage - defdelegate terminate(opts), to: PureFileStorage - defdelegate hibernate(opts), to: PureFileStorage - defdelegate compact(opts, keep_complete_chunks), to: PureFileStorage - defdelegate append_move_in_snapshot_to_log!(name, writer_state), to: PureFileStorage - - defdelegate append_move_in_snapshot_to_log_filtered!( - name, - writer_state, - touch_tracker, - snapshot, - tags_to_skip - ), - to: PureFileStorage - - defdelegate append_control_message!(control_message, writer_state), to: PureFileStorage - defdelegate write_move_in_snapshot!(stream, name, opts), to: PureFileStorage - - defp stack_agent_name(opts) do - Electric.ProcessRegistry.name(opts, __MODULE__, :agent) - end - - def stack_start_link(opts) do - {:ok, _agent} = Agent.start_link(fn -> 0 end, name: stack_agent_name(opts)) - PureFileStorage.stack_start_link(opts) - end - - def shared_opts(opts) do - opts - |> PureFileStorage.shared_opts() - |> Map.put(:extra_opts, %{num_calls_until_crash: Keyword.fetch!(opts, :num_calls_until_crash)}) - end - - def init_writer!(opts, shape_definition) do - Agent.update(stack_agent_name(opts), fn _ -> opts.extra_opts.num_calls_until_crash end) - PureFileStorage.init_writer!(opts, shape_definition) - end - - def append_to_log!(log_items, opts) do - num_calls_until_crash = Agent.get(stack_agent_name(opts), & &1) - - if num_calls_until_crash == 0 do - Agent.update(stack_agent_name(opts), fn _ -> opts.extra_opts.num_calls_until_crash end) - raise "Simulated storage failure" - end - - Agent.update(stack_agent_name(opts), fn n -> n - 1 end) - - PureFileStorage.append_to_log!(log_items, opts) - end -end diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index 70d1377279..f9b88451c3 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -318,6 +318,58 @@ defmodule Electric.ShapeCache.InMemoryStorage do opts end + @impl Electric.ShapeCache.Storage + def append_fragment_to_log!(log_items, %MS{} = opts) do + # For in-memory storage, fragment writes are similar to full transaction writes + # but we do NOT update @latest_offset_key until commit, and we do NOT send + # the :flushed message. This ensures fetch_latest_offset returns the last + # committed transaction offset, not a mid-transaction offset. + log_table = opts.log_table + chunk_checkpoint_table = opts.chunk_checkpoint_table + + {processed_log_items, _last_offset} = + Enum.map_reduce(log_items, nil, fn + {:chunk_boundary, offset}, curr -> + {{storage_offset(offset), :checkpoint}, curr} + + {offset, _key, _op_type, json_log_item}, _ -> + {{{:offset, storage_offset(offset)}, json_log_item}, offset} + end) + + processed_log_items + |> Enum.split_with(fn item -> match?({_, :checkpoint}, item) end) + |> then(fn {checkpoints, log_items} -> + :ets.insert(chunk_checkpoint_table, checkpoints) + :ets.insert(log_table, log_items) + # Note: NOT updating @latest_offset_key until signal_txn_commit! + end) + + opts + end + + @impl Electric.ShapeCache.Storage + def signal_txn_commit!(_xid, %MS{} = opts) do + # For in-memory storage, the commit signal updates @latest_offset_key + # to the last written offset and sends the :flushed message + log_table = opts.log_table + + # Find the last written offset in the log table + case :ets.last(log_table) do + :"$end_of_table" -> + opts + + {:offset, offset_tuple} -> + last_offset = LogOffset.new(offset_tuple) + :ets.insert(opts.snapshot_table, {@latest_offset_key, last_offset}) + send(self(), {Storage, :flushed, last_offset}) + opts + + _ -> + # Other keys (like :movein) - no update needed + opts + end + end + @impl Electric.ShapeCache.Storage def write_move_in_snapshot!(stream, name, %MS{log_table: log_table}) do stream diff --git a/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex index 90ca32e7a7..82e03bc514 100644 --- a/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex @@ -149,8 +149,8 @@ defmodule Electric.ShapeCache.PureFileStorage do end defp shape_data_dir(%__MODULE__{stack_id: stack_id, shape_handle: shape_handle}, suffix) do - {__MODULE__, stack_opts} = Storage.for_stack(stack_id) - shape_data_dir(stack_opts.base_path, shape_handle, suffix) + base_path = Storage.opt_for_stack(stack_id, :base_path) + shape_data_dir(base_path, shape_handle, suffix) end defp shape_data_dir(base_path, shape_handle, suffix \\ []) @@ -167,10 +167,7 @@ defmodule Electric.ShapeCache.PureFileStorage do defp shape_snapshot_dir(opts), do: shape_data_dir(opts, [@snapshot_dir]) defp shape_snapshot_path(opts, filename), do: shape_data_dir(opts, [@snapshot_dir, filename]) - defp tmp_dir(%__MODULE__{} = opts) do - {__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id) - stack_opts.tmp_dir - end + defp tmp_dir(%__MODULE__{} = opts), do: Storage.opt_for_stack(opts.stack_id, :tmp_dir) def stack_start_link(opts) do Supervisor.start_link( @@ -537,8 +534,9 @@ defmodule Electric.ShapeCache.PureFileStorage do ) if shape_definition.storage.compaction == :enabled do - {__MODULE__, stack_opts} = Storage.for_stack(shape_opts.stack_id) - schedule_compaction(stack_opts.compaction_config) + shape_opts.stack_id + |> Storage.opt_for_stack(:compaction_config) + |> schedule_compaction() end writer_state( @@ -1306,6 +1304,44 @@ defmodule Electric.ShapeCache.PureFileStorage do end end + @doc """ + Append log items from a transaction fragment. + + Unlike `append_to_log!/2`, this does NOT advance `last_seen_txn_offset` or + call `register_complete_txn`. Transaction completion should be signaled + separately via `signal_txn_commit!/2`. + + This ensures that on crash/recovery, `fetch_latest_offset` returns the + last committed transaction offset, not a mid-transaction offset. + """ + def append_fragment_to_log!(fragment_lines, writer_state(writer_acc: acc) = state) do + fragment_lines + |> normalize_log_stream() + |> WriteLoop.append_fragment_to_log!(acc, state) + |> case do + {acc, cancel_flush_timer: true} -> + timer_ref = writer_state(state, :write_timer) + if not is_nil(timer_ref), do: Process.cancel_timer(timer_ref) + writer_state(state, writer_acc: acc, write_timer: nil) + + {acc, schedule_flush: times_flushed} -> + writer_state(state, writer_acc: acc) + |> schedule_flush(times_flushed) + end + end + + @doc """ + Signal that a transaction has committed. + + Updates `last_seen_txn_offset` and persists metadata to mark the transaction + as complete. Should be called after all fragments have been written via + `append_fragment_to_log!/2`. + """ + def signal_txn_commit!(_xid, writer_state(writer_acc: acc) = state) do + acc = WriteLoop.signal_txn_commit(acc, state) + writer_state(state, writer_acc: acc) + end + def update_chunk_boundaries_cache(opts, boundaries) do :ets.update_element( opts.stack_ets, @@ -1320,13 +1356,13 @@ defmodule Electric.ShapeCache.PureFileStorage do if WriteLoop.has_flushed_since?(acc, old) or is_nil(timer) do if not is_nil(timer), do: Process.cancel_timer(timer) - {__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id) + flush_period = Storage.opt_for_stack(opts.stack_id, :flush_period) ref = Process.send_after( self(), {Storage, {__MODULE__, :perform_scheduled_flush, [WriteLoop.times_flushed(acc)]}}, - stack_opts.flush_period + flush_period ) writer_state(state, write_timer: ref) diff --git a/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex b/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex index d80c88f4ac..ef93ff31e4 100644 --- a/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex +++ b/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex @@ -128,6 +128,55 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do end end + @doc """ + Append log lines from a transaction fragment. + + Unlike `append_to_log!/3`, this does NOT advance `last_seen_txn_offset` or + call `register_complete_txn`. Transaction completion should be signaled + separately via `register_complete_txn/2` after the commit is received. + + This ensures that on crash/recovery, `fetch_latest_offset` returns the + last committed transaction offset, not a mid-transaction offset. + """ + def append_fragment_to_log!( + fragment_lines, + writer_acc(times_flushed: times_flushed) = acc, + state + ) do + acc = ensure_json_file_open(acc, state) + + fragment_lines + |> Enum.reduce(acc, fn + {offset, _, _, _, _, _, _}, writer_acc(last_seen_txn_offset: min_offset) = acc + when is_log_offset_lte(offset, min_offset) -> + # Line already persisted, no-op + acc + + {offset, _, _, _, _, _, _} = line, acc -> + acc + |> maybe_write_opening_chunk_boundary(state, offset) + |> add_to_buffer(line) + |> maybe_write_closing_chunk_boundary(state) + |> maybe_flush_buffer(state) + end) + |> case do + # If the buffer has been fully flushed, no need to schedule more flushes + # Note: We do NOT update last_seen_txn_offset or call register_complete_txn + # since the transaction is not yet complete + writer_acc(buffer_size: 0) = acc -> + acc = close_chunk_file(acc) + {acc, cancel_flush_timer: true} + + acc -> + acc = + acc + |> store_lines_in_ets(state) + |> close_chunk_file() + + {acc, schedule_flush: times_flushed} + end + end + ### Working with the buffer defp add_to_buffer( @@ -374,4 +423,15 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do end |> update_persistance_metadata(state, prev_persisted_txn) end + + @doc """ + Signal that a transaction has been committed. + + This updates `last_seen_txn_offset` and potentially `last_persisted_txn_offset` + to mark the transaction as complete. Should be called after all fragments + have been written via `append_fragment_to_log!/3`. + """ + def signal_txn_commit(acc, state) do + register_complete_txn(acc, state) + end end diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index 4ca33422dc..de5e93e2b9 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -151,6 +151,29 @@ defmodule Electric.ShapeCache.Storage do @callback append_to_log!(Enumerable.t(log_item()), writer_state()) :: writer_state() | no_return() + @doc """ + Append log items from a transaction fragment. + + Called potentially multiple times per transaction for shapes that stream + fragments directly to storage without waiting for the complete transaction. + Unlike `append_to_log!/2`, this does not assume transaction completion. + + Transaction commits should be signaled separately via `signal_txn_commit!/2` + to allow storage to calculate chunk boundaries at transaction boundaries. + """ + @callback append_fragment_to_log!(Enumerable.t(log_item()), writer_state()) :: + writer_state() | no_return() + + @doc """ + Signal that a transaction has committed. + + Used by storage to calculate chunk boundaries at transaction boundaries. + Called after all fragments for a transaction have been written via + `append_fragment_to_log!/2`. + """ + @callback signal_txn_commit!(xid :: pos_integer(), writer_state()) :: + writer_state() | no_return() + @doc "Get stream of the log for a shape since a given offset" @callback get_log_stream(offset :: LogOffset.t(), max_offset :: LogOffset.t(), shape_opts()) :: log() @@ -211,6 +234,29 @@ defmodule Electric.ShapeCache.Storage do Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) end + # This function seamlessly unwraps TestStorage to give the storage implementation direct access to its options. + if Mix.env() != :test do + def opts_for_stack(stack_id) do + {_module, opts} = Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) + opts + end + else + def opts_for_stack(stack_id) do + case Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) do + {Support.TestStorage, {_parent_pid, _test_storage_init, {_storage_mod, storage_opts}}} -> + storage_opts + + {_storage_mod, storage_opts} -> + storage_opts + end + end + end + + def opt_for_stack(stack_id, opt_name) do + opts = opts_for_stack(stack_id) + Map.fetch!(opts, opt_name) + end + @spec child_spec(shape_storage()) :: Supervisor.child_spec() def child_spec({module, shape_opts}) do %{ @@ -355,6 +401,16 @@ defmodule Electric.ShapeCache.Storage do {mod, mod.append_to_log!(log_items, shape_opts)} end + @impl __MODULE__ + def append_fragment_to_log!(log_items, {mod, shape_opts}) do + {mod, mod.append_fragment_to_log!(log_items, shape_opts)} + end + + @impl __MODULE__ + def signal_txn_commit!(xid, {mod, shape_opts}) do + {mod, mod.signal_txn_commit!(xid, shape_opts)} + end + @impl __MODULE__ def get_log_stream(offset, max_offset \\ @last_log_offset, storage) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 8bc0109adf..2541a0b9af 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -2,9 +2,10 @@ defmodule Electric.Shapes.Consumer do use GenServer, restart: :temporary alias Electric.Shapes.Consumer.ChangeHandling - alias Electric.Shapes.Consumer.MoveIns alias Electric.Shapes.Consumer.InitialSnapshot alias Electric.Shapes.Consumer.MoveHandling + alias Electric.Shapes.Consumer.MoveIns + alias Electric.Shapes.Consumer.PendingTxn alias Electric.Shapes.Consumer.State import Electric.Shapes.Consumer.State, only: :macros @@ -35,6 +36,10 @@ defmodule Electric.Shapes.Consumer do @stop_and_clean_timeout 30_000 @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() + defguardp txn_fragment_has_entire_txn?(txn_fragment) + when is_struct(txn_fragment, TransactionFragment) and txn_fragment.has_begin? and + not is_nil(txn_fragment.commit) + def name(stack_id, shape_handle) when is_binary(shape_handle) do ConsumerRegistry.name(stack_id, shape_handle) end @@ -166,16 +171,13 @@ defmodule Electric.Shapes.Consumer do stop_and_clean(state) end - def handle_continue(:consume_buffer, %State{buffer: buffer} = state) do - Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transactions" end) - state = %{state | buffer: [], buffering?: false} - - case handle_txns(Enum.reverse(buffer), state) do - %State{terminating?: true} = state -> - {:noreply, state, {:continue, :stop_and_clean}} + def handle_continue(:consume_buffer, state) do + state = process_buffered_txn_fragments(state) - state -> - {:noreply, state, state.hibernate_after} + if state.terminating? do + {:noreply, state, {:continue, :stop_and_clean}} + else + {:noreply, state, state.hibernate_after} end end @@ -452,33 +454,316 @@ defmodule Electric.Shapes.Consumer do end defp handle_event(%TransactionFragment{} = txn_fragment, state) do - {txns, transaction_builder} = - TransactionBuilder.build(txn_fragment, state.transaction_builder) + Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) + handle_txn_fragment(txn_fragment, state) + end - state = %{state | transaction_builder: transaction_builder} - handle_txns(txns, state) + # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). + # In this phase we have to buffer incoming txn fragments because we can't yet decide what to + # do with the transaction: skip it or write it to the shape log. + # + # When snapshot info arrives, `process_buffered_txn_fragments/1` will be called to process + # buffered fragments in order. + defp handle_txn_fragment( + %TransactionFragment{} = txn_fragment, + %State{buffering?: true} = state + ) do + State.add_to_buffer(state, txn_fragment) end - defp handle_txns(txns, %State{} = state), do: Enum.reduce_while(txns, state, &handle_txn/2) + # Short-circuit clauses for the most common case of a single-fragment transaction + defp handle_txn_fragment(%TransactionFragment{} = txn_fragment, state) + when txn_fragment_has_entire_txn?(txn_fragment) and needs_initial_filtering(state) do + case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, txn_fragment.xid) do + {:consider_flushed, initial_snapshot_state} -> + # This transaction is already included in the snapshot, flush it immediately and skip + # writing it to the shape log. + state = %{state | initial_snapshot_state: initial_snapshot_state} + consider_flushed(state, txn_fragment.last_log_offset) + + {:continue, initial_snapshot_state} -> + # The transaction is not part of the initial snapshot. + state = %{state | initial_snapshot_state: initial_snapshot_state} + build_and_handle_txn(txn_fragment, state) + end + end - # Keep buffering for initial snapshot - defp handle_txn(txn, %State{buffering?: true} = state), - do: {:cont, State.add_to_buffer(state, txn)} + defp handle_txn_fragment(%TransactionFragment{} = txn_fragment, state) + when txn_fragment_has_entire_txn?(txn_fragment) do + build_and_handle_txn(txn_fragment, state) + end - defp handle_txn(txn, state) when needs_initial_filtering(state) do - case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, txn) do - {:consider_flushed, initial_snapshot_state} -> - {:cont, consider_flushed(%{state | initial_snapshot_state: initial_snapshot_state}, txn)} + # pending_txn struct is initialized to keep track of all fragments comprising this txn and + # store the "consider_flushed" state on it. + defp handle_txn_fragment( + %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, + %State{pending_txn: nil} = state + ) do + txn = PendingTxn.new(xid) + state = %{state | pending_txn: txn} + handle_txn_fragment(txn_fragment, state) + end + + # Upon seeing the first fragment of a new transaction, check if its xid is already included in the + # initial snapshot. If it is, all subsequent fragments of this transaction will be ignored. + # + # Initial filtering is giving us the advantage of not accumulating fragments for a + # transaction that is going to be skipped anyway. This works both in the regular mode and in + # the fragment-direct mode. + defp handle_txn_fragment( + %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, + %State{} = state + ) + when needs_initial_filtering(state) do + state = + case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, xid) do + {:consider_flushed, initial_snapshot_state} -> + # This transaction is already included in the snapshot, so mark it as flushed to + # ignore any of its follow-up fragments. + %{ + state + | pending_txn: PendingTxn.consider_flushed(state.pending_txn), + initial_snapshot_state: initial_snapshot_state + } + + {:continue, initial_snapshot_state} -> + # The transaction is not part of the initial snapshot. + %{state | initial_snapshot_state: initial_snapshot_state} + end + + process_txn_fragment(txn_fragment, state) + end + + defp handle_txn_fragment(txn_fragment, state), do: process_txn_fragment(txn_fragment, state) + + defp process_txn_fragment( + %TransactionFragment{} = txn_fragment, + %State{pending_txn: txn} = state + ) do + cond do + # Fragments belonging to the same transaction can all be skipped either via xid-filtering or log offset filtering. + # TODO: Do not mark the fragment itself flushed. When the cnnection restarts, we want to + # receive all fragments starting from the one with has_begin? so as not to overcomplicate + # the consumer logic. + txn.consider_flushed? or fragment_already_processed?(txn_fragment, state) -> + skip_txn_fragment(state, txn_fragment) + + # In the regular mode all fragments are buffered until the Commit change is seen. At that + # point, a transaction struct is produced from the buffered fragments and is written to + # storage. + not state.fragment_direct? -> + {txns, transaction_builder} = + TransactionBuilder.build(txn_fragment, state.transaction_builder) + + state = %{state | transaction_builder: transaction_builder} + + case txns do + [] -> + state + + [txn] -> + Logger.debug(fn -> "Txn assembled in Shapes.Consumer: #{inspect(txn)}" end) + handle_txn(txn, %{state | pending_txn: nil}) + end + + true -> + # If we've ended up in this branch, we know for sure that the current fragment is one of a few for the transaction. + {state, persisted_changes} = write_txn_fragment_to_storage(state, txn_fragment) + maybe_complete_pending_txn(state, txn_fragment, persisted_changes) + end + end + + # This function does similar things to do_handle_txn/2 but with the following simplifications: + # - it doesn't account for move-ins or move-outs or converting update operations into insert/delete + # - the fragment is written directly to storage if it has changes matching this shape + # - if the fragment has a commit message, the ShapeLogCollector is informed about the new flush boundary + defp write_txn_fragment_to_storage(state, %TransactionFragment{changes: []}), do: {state, []} + + defp write_txn_fragment_to_storage( + state, + %TransactionFragment{changes: changes, commit: commit, xid: xid} = fragment + ) do + %{ + shape: shape, + writer: writer, + pending_txn: txn, + stack_id: stack_id, + shape_handle: shape_handle + } = state + + case convert_fragment_changes(changes, stack_id, shape_handle, shape) do + :includes_truncate -> + {handle_txn_with_truncate(xid, state), []} + + {[], 0} -> + Logger.debug(fn -> + "No relevant changes found for #{inspect(shape)} in txn fragment of txn #{xid}" + end) + + # TODO: this needs to signal something to the subsequent maybe_complete_pending_txn call + {consider_flushed(state, fragment.last_log_offset), []} + + {reversed_changes, num_changes, last_log_offset} -> + converted_changes = + reversed_changes + |> maybe_mark_last_change(commit) + |> Enum.reverse() + + {lines, total_size} = prepare_log_entries(converted_changes, xid, shape) + writer = ShapeCache.Storage.append_fragment_to_log!(lines, writer) + + txn = PendingTxn.update_with_changes(txn, last_log_offset, num_changes, total_size) + + Logger.debug(fn -> + "Wrote #{num_changes} changes for fragment xid=#{xid}, total_bytes=#{total_size}" + end) + + {%{ + state + | writer: writer, + latest_offset: last_log_offset, + pending_txn: txn, + txn_offset_mapping: + state.txn_offset_mapping ++ [{last_log_offset, fragment.last_log_offset}] + }, converted_changes} + end + end + + defp convert_fragment_changes(changes, stack_id, shape_handle, shape) do + Enum.reduce_while(changes, {[], 0}, fn + %Changes.TruncatedRelation{}, _acc -> + {:halt, :includes_truncate} + + change, {changes, count} = acc -> + # Apply Shape.convert_change to each change to: + # 1. Filter out changes not matching the shape's table + # 2. Apply WHERE clause filtering + case Shape.convert_change(shape, change, stack_id: stack_id, shape_handle: shape_handle) do + [] -> + {:cont, acc} + + [change] -> + {:cont, {[change | changes], count + 1}} + end + end) + |> case do + {[change | _] = changes, num_changes} -> + {changes, num_changes, LogItems.expected_offset_after_split(change)} + + acc -> + acc + end + end + + # Mark the last change in the list as last? when this is a commit fragment + # This is needed for clients to know when a transaction is complete + # The changes passed to this function are in reversed order, i.e. the last change is the head of the list. + defp maybe_mark_last_change([], _commit), do: [] + defp maybe_mark_last_change(changes, nil), do: changes - {:continue, new_initial_snapshot_state} -> - handle_txn_in_span(txn, %{state | initial_snapshot_state: new_initial_snapshot_state}) + defp maybe_mark_last_change(changes, _commit) do + [head | tail] = changes + [%{head | last?: true} | tail] + end + + defp skip_txn_fragment(state, %TransactionFragment{commit: nil}), do: state + + defp skip_txn_fragment(state, %TransactionFragment{} = txn_fragment) do + # Reset the pending_txn field to be ready for the first txn fragment of the next incoming transaction. + %{state | pending_txn: nil} + |> consider_flushed(txn_fragment.last_log_offset) + end + + defp maybe_complete_pending_txn(%State{} = state, %TransactionFragment{commit: nil}, _changes), + do: state + + defp maybe_complete_pending_txn(%State{terminating?: true} = state, _fragment, _changes) do + # If we're terminating (e.g., due to truncate), don't complete the transaction + state + end + + defp maybe_complete_pending_txn( + %State{} = state, + %TransactionFragment{commit: commit} = txn_fragment, + persisted_changes + ) do + %{pending_txn: txn, writer: writer, shape: shape, shape_handle: shape_handle} = state + + # Signal commit to storage for potential chunk boundary calculation + writer = ShapeCache.Storage.signal_txn_commit!(txn.xid, writer) + + # Only notify if we actually wrote changes + if persisted_changes != [] do + # TODO: Only notify upon writing the full txn to storage + notify_new_changes(state, persisted_changes, txn.last_log_offset) + + lag = calculate_replication_lag_from_commit(commit) + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :storage, :transaction_stored], + %{ + bytes: txn.total_bytes, + count: 1, + operations: txn.num_changes, + replication_lag: lag + }, + Map.new(shape_attrs(shape_handle, shape)) + ) + + Logger.debug(fn -> + "Completed fragment-direct transaction xid=#{txn.xid}, changes=#{txn.num_changes}" + end) + + %{ + state + | writer: writer, + latest_offset: txn.last_log_offset, + pending_txn: nil, + txn_offset_mapping: + state.txn_offset_mapping ++ [{txn.last_log_offset, txn_fragment.last_log_offset}] + } + else + # TODO: is this even reachable??? + + # No changes were written - notify flush boundary like consider_flushed does + Logger.debug(fn -> + "No relevant changes in fragment-direct transaction xid=#{txn.xid}" + end) + + state = %{state | writer: writer, pending_txn: nil} + # OR pending_txn.last_log_offset ??? + consider_flushed(state, txn_fragment.last_log_offset) end end - # Remove the move-in buffering check - just process immediately - defp handle_txn(txn, state), do: handle_txn_in_span(txn, state) + defp calculate_replication_lag_from_commit(%Changes.Commit{commit_timestamp: nil}), do: 0 + + defp calculate_replication_lag_from_commit(%Changes.Commit{commit_timestamp: commit_timestamp}) do + now = DateTime.utc_now() + Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) + end + + def process_buffered_txn_fragments(%State{buffer: buffer} = state) do + Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transaction fragments" end) + {txn_fragments, state} = State.pop_buffered(state) + + Enum.reduce_while(txn_fragments, state, fn txn_fragment, state -> + state = handle_txn_fragment(txn_fragment, state) + + if state.terminating? do + {:halt, state} + else + {:cont, state} + end + end) + end - defp handle_txn_in_span(txn, %State{} = state) do + defp build_and_handle_txn(%TransactionFragment{} = txn_fragment, %State{} = state) do + {[txn], _} = TransactionBuilder.build(txn_fragment, TransactionBuilder.new()) + handle_txn(txn, state) + end + + defp handle_txn(txn, %State{} = state) do ot_attrs = [xid: txn.xid, total_num_changes: txn.num_changes] ++ shape_attrs(state.shape_handle, state.shape) @@ -487,30 +772,15 @@ defmodule Electric.Shapes.Consumer do "shape_write.consumer.handle_txn", ot_attrs, state.stack_id, - fn -> - do_handle_txn(txn, state) - end + fn -> do_handle_txn(txn, state) end ) end - defp do_handle_txn(%Transaction{} = txn, state) - when LogOffset.is_log_offset_lte(txn.last_log_offset, state.latest_offset) do - Logger.debug(fn -> "Skipping already processed txn #{txn.xid}" end) - - {:cont, consider_flushed(state, txn)} - end - defp do_handle_txn(%Transaction{xid: xid, changes: changes} = txn, state) do - %{ - shape: shape, - shape_handle: shape_handle, - writer: writer - } = state + %{shape: shape, writer: writer} = state state = State.remove_completed_move_ins(state, txn) - Logger.debug(fn -> "Txn received in Shapes.Consumer: #{inspect(txn)}" end) - extra_refs_full = Materializer.get_all_as_refs(shape, state.stack_id) @@ -530,23 +800,14 @@ defmodule Electric.Shapes.Consumer do %{xid: xid, extra_refs: {extra_refs_before_move_ins, extra_refs_full}} ) do :includes_truncate -> - # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are - # present in the transaction, we're considering the whole transaction empty, and - # just rotate the shape handle. "Correct" way to handle truncates is to be designed. - Logger.warning( - "Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}" - ) - - state = mark_for_removal(state) - - {:halt, state} + handle_txn_with_truncate(txn.xid, state) {_, state, 0, _} -> Logger.debug(fn -> "No relevant changes found for #{inspect(shape)} in txn #{txn.xid}" end) - {:cont, consider_flushed(state, txn)} + consider_flushed(state, txn.last_log_offset) {changes, state, num_changes, last_log_offset} -> timestamp = System.monotonic_time() @@ -576,17 +837,27 @@ defmodule Electric.Shapes.Consumer do Map.new(shape_attrs(state.shape_handle, state.shape)) ) - {:cont, - %{ - state - | writer: writer, - latest_offset: last_log_offset, - txn_offset_mapping: - state.txn_offset_mapping ++ [{last_log_offset, txn.last_log_offset}] - }} + %{ + state + | writer: writer, + latest_offset: last_log_offset, + txn_offset_mapping: + state.txn_offset_mapping ++ [{last_log_offset, txn.last_log_offset}] + } end end + defp handle_txn_with_truncate(xid, state) do + # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are + # present in the transaction, we're considering the whole transaction empty, and + # just rotate the shape handle. "Correct" way to handle truncates is to be designed. + Logger.warning( + "Truncate operation encountered while processing txn #{xid} for #{state.shape_handle}" + ) + + mark_for_removal(state) + end + defp notify_new_changes(state, nil), do: state defp notify_new_changes(state, {changes, upper_bound}) do @@ -685,14 +956,19 @@ defmodule Electric.Shapes.Consumer do Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) end - defp consider_flushed(%State{} = state, %Transaction{last_log_offset: new_boundary}) do + defp fragment_already_processed?(%TransactionFragment{last_log_offset: offset}, state) do + LogOffset.is_log_offset_lte(offset, state.latest_offset) + end + + defp consider_flushed(%State{} = state, log_offset) do if state.txn_offset_mapping == [] do # No relevant txns have been observed and unflushed, we can notify immediately - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, new_boundary) + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, log_offset) state else # We're looking to "relabel" the next flush to include this txn, so we're looking for the # boundary that has a highest boundary less than this offset + new_boundary = log_offset {head, tail} = Enum.split_while( diff --git a/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex b/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex index c654d63d2e..ff4f1a4d1d 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex @@ -43,11 +43,16 @@ defmodule Electric.Shapes.Consumer.InitialSnapshot do def needs_buffering?(%__MODULE__{pg_snapshot: snapshot}), do: is_nil(snapshot) + def maybe_stop_initial_filtering(%__MODULE__{} = state, storage, %Transaction{xid: xid}) do + maybe_stop_initial_filtering(state, storage, xid) + end + def maybe_stop_initial_filtering( %__MODULE__{pg_snapshot: {xmin, xmax, xip_list} = snapshot} = state, storage, - %Transaction{xid: xid} - ) do + xid + ) + when is_integer(xid) and xid > 0 do if Xid.after_snapshot?(xid, snapshot) do Storage.set_pg_snapshot( %{xmin: xmin, xmax: xmax, xip_list: xip_list, filter_txns?: false}, @@ -89,11 +94,15 @@ defmodule Electric.Shapes.Consumer.InitialSnapshot do %{state | snapshot_started?: true} end - def filter(state, storage, %Transaction{} = txn) do - if Transaction.visible_in_snapshot?(txn, state.pg_snapshot) do + def filter(state, storage, %Transaction{xid: xid}) do + filter(state, storage, xid) + end + + def filter(state, storage, xid) when is_integer(xid) and xid > 0 do + if Transaction.visible_in_snapshot?(xid, state.pg_snapshot) do {:consider_flushed, state} else - state = maybe_stop_initial_filtering(state, storage, txn) + state = maybe_stop_initial_filtering(state, storage, xid) {:continue, state} end end diff --git a/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex b/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex new file mode 100644 index 0000000000..fefa0b1d5c --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex @@ -0,0 +1,58 @@ +defmodule Electric.Shapes.Consumer.PendingTxn do + @moduledoc """ + Tracks metadata for an in-progress transaction during fragment-direct streaming. + + When a consumer streams transaction fragments directly to storage (for shapes + without subquery dependencies), this struct tracks the transaction metadata + until commit is received. + + This is an antipod module to Electric.Replication.TransactionBuilder. This module only tracks + metadata related to the current transaction for which txn fragments are processed as the + fragments themselves are written to strorage and are discarded from memory immediately, while + the TransactionBuilder module accumulates all changes in memory and returns a complete + transaction after seeing a Commit. + """ + + alias Electric.Replication.LogOffset + + defstruct [ + :xid, + :last_log_offset, + consider_flushed?: false, + num_changes: 0, + total_bytes: 0 + ] + + @type t :: %__MODULE__{ + xid: pos_integer(), + last_log_offset: LogOffset.t() | nil, + consider_flushed?: boolean(), + num_changes: non_neg_integer(), + total_bytes: non_neg_integer() + } + + @doc """ + Create a new pending transaction tracker. + """ + @spec new(pos_integer()) :: t() + def new(xid) do + %__MODULE__{xid: xid} + end + + @doc """ + Update the pending transaction with changes that were written to storage. + """ + @spec update_with_changes(t(), LogOffset.t(), non_neg_integer(), non_neg_integer()) :: t() + def update_with_changes(%__MODULE__{} = pending_txn, log_offset, count, bytes) do + %{ + pending_txn + | last_log_offset: log_offset, + num_changes: pending_txn.num_changes + count, + total_bytes: pending_txn.total_bytes + bytes + } + end + + def consider_flushed(%__MODULE__{} = pending_txn) do + %{pending_txn | consider_flushed?: true} + end +end diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index ee3f517fb5..46badfb46b 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -3,7 +3,6 @@ defmodule Electric.Shapes.Consumer.State do alias Electric.Shapes.Consumer.MoveIns alias Electric.Shapes.Consumer.InitialSnapshot alias Electric.Shapes.Shape - alias Electric.Replication.Changes.Transaction alias Electric.Replication.Eval.Parser alias Electric.Replication.Eval.Walker alias Electric.Replication.TransactionBuilder @@ -30,7 +29,14 @@ defmodule Electric.Shapes.Consumer.State do terminating?: false, buffering?: false, or_with_subquery?: false, - not_with_subquery?: false + not_with_subquery?: false, + # Fragment-direct streaming fields + # When true, stream fragments directly to storage without buffering + fragment_direct?: false, + # Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen. + # It is used to check whether the entire txn is visible in the snapshot and to mark it + # as flushed in order to handle its remaining fragments appropriately. + pending_txn: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -143,7 +149,9 @@ defmodule Electric.Shapes.Consumer.State do ), buffering?: true, or_with_subquery?: has_or_with_subquery?(shape), - not_with_subquery?: has_not_with_subquery?(shape) + not_with_subquery?: has_not_with_subquery?(shape), + # Enable fragment-direct streaming for shapes without subquery dependencies + fragment_direct?: shape.shape_dependencies == [] } end @@ -241,11 +249,16 @@ defmodule Electric.Shapes.Consumer.State do end end - @spec add_to_buffer(t(), Transaction.t()) :: t() + @spec add_to_buffer(t(), TransactionFragment.t()) :: t() def add_to_buffer(%__MODULE__{buffer: buffer} = state, txn) do %{state | buffer: [txn | buffer]} end + @spec pop_buffered(t()) :: {[TransactionFragment.t()], t()} + def pop_buffered(%__MODULE__{buffer: buffer} = state) do + {Enum.reverse(buffer), %{state | buffer: [], buffering?: false}} + end + @spec add_waiter(t(), GenServer.from()) :: t() def add_waiter(%__MODULE__{initial_snapshot_state: initial_snapshot_state} = state, from) do %{ diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 81146562a1..a4b3411be7 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -209,7 +209,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {^ref, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} + refute_storage_calls_for_txn_fragment(@shape_handle2) txn2 = transaction(xid, next_lsn, [ @@ -223,7 +223,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) assert_receive {^ref, :new_changes, ^next_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} + refute_storage_calls_for_txn_fragment(@shape_handle2) end test "correctly writes only relevant changes to multiple shape logs", ctx do @@ -303,7 +303,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} + refute_storage_calls_for_txn_fragment(@shape_handle1) refute_receive {^ref1, :new_changes, _} assert_receive {^ref2, :new_changes, _} @@ -384,7 +384,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) end) - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} + refute_storage_calls_for_txn_fragment(@shape_handle1) assert_shape_cleanup(@shape_handle1) @@ -1120,4 +1120,17 @@ defmodule Electric.Shapes.ConsumerTest do affected_relations: MapSet.new(changes, & &1.relation) } end + + defp assert_storage_calls_for_txn_fragment(shape_handle, log_offset, txn_xid, txn_op) do + assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, + [{^log_offset, _key, ^txn_op, _json}]} + + assert_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, ^txn_xid} + end + + defp refute_storage_calls_for_txn_fragment(shape_handle) do + refute_receive {Support.TestStorage, :append_to_log!, ^shape_handle, _} + refute_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, _} + refute_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, _} + end end diff --git a/packages/sync-service/test/support/test_storage.ex b/packages/sync-service/test/support/test_storage.ex index 206f6c8db2..a023adcd97 100644 --- a/packages/sync-service/test/support/test_storage.ex +++ b/packages/sync-service/test/support/test_storage.ex @@ -150,6 +150,20 @@ defmodule Support.TestStorage do {parent, shape_handle, data, storage} end + @impl Electric.ShapeCache.Storage + def append_fragment_to_log!(log_items, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :append_fragment_to_log!, shape_handle, log_items}) + storage = Storage.append_fragment_to_log!(log_items, storage) + {parent, shape_handle, data, storage} + end + + @impl Electric.ShapeCache.Storage + def signal_txn_commit!(xid, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :signal_txn_commit!, shape_handle, xid}) + storage = Storage.signal_txn_commit!(xid, storage) + {parent, shape_handle, data, storage} + end + @impl Electric.ShapeCache.Storage def append_move_in_snapshot_to_log!(name, {parent, shape_handle, data, storage}) do send(parent, {__MODULE__, :append_move_in_snapshot_to_log!, shape_handle, name})