diff --git a/integration-tests/tests/crash-recovery.lux b/integration-tests/tests/crash-recovery.lux index 535815ff55..7d5add4bcf 100644 --- a/integration-tests/tests/crash-recovery.lux +++ b/integration-tests/tests/crash-recovery.lux @@ -166,7 +166,8 @@ # slower than the pure-ets version (~50ms vs ~5ms) which means that this request # arrives before the stack has received the pending operation. # To mitigate this we ensure the txn has been processed. - ??[debug] Txn received in Shapes.Consumer + # Note: With fragment-direct streaming, the log message is different. + ??[debug] Completed fragment-direct transaction # Client should be able to continue same shape [shell client] diff --git a/packages/sync-service/agent-docs/flush-tracking-architecture.md b/packages/sync-service/agent-docs/flush-tracking-architecture.md new file mode 100644 index 0000000000..e489df412c --- /dev/null +++ b/packages/sync-service/agent-docs/flush-tracking-architecture.md @@ -0,0 +1,368 @@ +# Flush Tracking Architecture + +This document explains how Electric's sync service tracks write progress across multiple shapes and coordinates acknowledgments back to Postgres. Understanding this architecture is essential for anyone working on storage, replication, or considering changes to the transaction processing pipeline. + +## Table of Contents + +1. [Overview](#overview) +2. [LogOffset: The Fundamental Unit](#logoffset-the-fundamental-unit) +3. [WriteLoop: Per-Shape Write Management](#writeloop-per-shape-write-management) +4. [FlushTracker: Global Coordination](#flushtracker-global-coordination) +5. [Consumer: Transaction Processing](#consumer-transaction-processing) +6. [End-to-End Flow](#end-to-end-flow) +7. [Design Considerations for Future Changes](#design-considerations-for-future-changes) + +--- + +## Overview + +Electric replicates data from Postgres to multiple "shapes" (filtered subsets of tables). Each shape has its own storage and flush cadence, but Postgres needs a single acknowledgment of how far data has been durably persisted. + +The key challenge: **Different shapes flush at different rates, and not all shapes see every transaction.** We need to compute a safe minimum offset to acknowledge to Postgres. + +### Key Components + +| Component | Scope | Responsibility | +| -------------------- | ------------------- | ------------------------------------------------------------ | +| `WriteLoop` | Per-shape | Buffers writes, flushes to disk, tracks persistence progress | +| `FlushTracker` | Global (all shapes) | Coordinates flush progress across shapes, notifies Postgres | +| `Consumer` | Per-shape | Processes transactions, maintains offset mappings | +| `TransactionBuilder` | Per-shape | Accumulates fragments into complete transactions | + +--- + +## LogOffset: The Fundamental Unit + +A `LogOffset` represents a position in the replication stream: + +```elixir +%LogOffset{ + tx_offset: 123456789, # Transaction LSN from Postgres + op_offset: 3 # Operation index within the transaction +} +``` + +### Special Values + +| Value | Meaning | +| ---------------- | ----------------------------------------------------- | +| `{-1, 0}` | Before any real offset (`before_all()`) | +| `{0, 0}` | First possible offset (`first()`) | +| `{0, :infinity}` | End of snapshot region (`last_before_real_offsets()`) | + +### Important Property + +**Shapes preserve original offsets.** When a shape filters changes from a transaction, it does NOT renumber them. Each change retains its original `log_offset` from the Postgres transaction. + +The only exception is **update splitting**: when an `UpdatedRecord` has a changed primary key, it becomes two log items (delete + insert), and the insert gets `offset + 1`: + +```elixir +# Original change at offset {100, 5} with changed PK +# Becomes: +# - Delete at {100, 5} +# - Insert at {100, 6} +``` + +--- + +## WriteLoop: Per-Shape Write Management + +`WriteLoop` (`lib/electric/shape_cache/pure_file_storage/write_loop.ex`) manages buffered writes for a single shape's file storage. + +### Key Fields + +| Field | Purpose | +| --------------------------- | ------------------------------------------------------------------------- | +| `last_seen_offset` | Most recent offset added to the in-memory buffer (may not be flushed yet) | +| `last_seen_txn_offset` | Last complete transaction boundary seen (set when transaction ends) | +| `last_persisted_offset` | Last offset written to disk via `datasync` | +| `last_persisted_txn_offset` | Last complete transaction fully persisted - **the stable read boundary** | + +### The Write Pipeline + +``` +Transaction arrives + │ + ▼ +┌─────────────────┐ +│ In-memory buffer│ ← last_seen_offset updated here +│ (up to 64KB) │ +└────────┬────────┘ + │ Buffer full OR timer fires OR chunk boundary + ▼ +┌─────────────────┐ +│ Disk write + │ ← last_persisted_offset updated here +│ datasync │ +└────────┬────────┘ + │ Transaction complete AND flushed + ▼ +┌─────────────────┐ +│ Txn boundary │ ← last_persisted_txn_offset updated here +│ advanced │ (this is what readers use) +└─────────────────┘ +``` + +### Flush Triggers + +Flushes occur when: + +1. **Buffer size threshold** (64KB) - `@delayed_write` constant +2. **Scheduled timer** fires (default 1 second) +3. **Chunk boundary** reached (default 10MB of JSON payload) + +### Chunk Boundaries Are NOT Transaction-Aligned + +A single large transaction can span multiple chunks: + +``` +Transaction with 15MB of changes: + +Chunk 1 (10MB) Chunk 2 (5MB) +┌──────────────────┐ ┌─────────────┐ +│ Changes 1-1000 │ │ Changes │ +│ (mid-transaction)│ │ 1001-1500 │ +│ │ │ (txn ends) │ +└──────────────────┘ └─────────────┘ + │ │ + ▼ ▼ + Flush occurs Flush occurs + (not txn-aligned) (txn-aligned) +``` + +### Reader Safety + +Readers use `last_persisted_txn_offset` as their boundary, NOT `last_persisted_offset`. This ensures they never see incomplete transactions, even if a flush occurred mid-transaction. + +--- + +## FlushTracker: Global Coordination + +`FlushTracker` (`lib/electric/replication/shape_log_collector/flush_tracker.ex`) lives inside `ShapeLogCollector` and coordinates flush progress across ALL shapes. + +### Key Fields + +| Field | Purpose | +| ---------------------------- | ---------------------------------------------------------------------------------- | +| `last_global_flushed_offset` | Minimum offset durably flushed across ALL shapes - safe to acknowledge to Postgres | +| `last_seen_offset` | Most recent transaction offset dispatched to shapes (the "high water mark") | +| `last_flushed` | Map of `shape_id => {last_sent, last_flushed}` tracking per-shape progress | +| `min_incomplete_flush_tree` | `gb_tree` for O(log n) minimum lookup across pending shapes | + +### Per-Shape Tracking + +The `last_flushed` map tracks each shape's progress: + +```elixir +%{ + "shape-abc" => {last_sent: {100, 5}, last_flushed: {100, 3}}, # Behind + "shape-xyz" => {last_sent: {100, 5}, last_flushed: {100, 5}} # Caught up (will be removed) +} +``` + +- **`last_sent`**: Latest offset sent to this shape for writing +- **`last_flushed`**: Latest offset this shape confirmed as persisted + +When `last_sent == last_flushed`, the shape is caught up and removed from the map. + +### The Global Offset Calculation + +```elixir +last_global_flushed_offset = max( + previous_global_flushed, + min(for {_, {_, last_flushed}} <- last_flushed_map, do: last_flushed) +) +``` + +The `min_incomplete_flush_tree` provides O(log n) access to this minimum without scanning all shapes. + +### Transaction-Aligned Acknowledgments + +When notifying Postgres: + +```elixir +defp notify_global_offset_updated(state) do + if state.last_flushed == %{} do + # All shapes caught up - safe to report actual tx_offset + state.notify_fn.(state.last_global_flushed_offset.tx_offset) + else + # Some shapes still pending - report tx_offset - 1 (conservative) + state.notify_fn.(state.last_global_flushed_offset.tx_offset - 1) + end +end +``` + +The `-1` safety margin ensures that if we've only partially flushed a transaction, we don't acknowledge it to Postgres. + +### Handling Shapes That Don't See Every Transaction + +When a new transaction arrives, shapes that weren't previously tracked are added with a safe upper bound: + +```elixir +# For a transaction at offset {100, 5}: +# New shapes get {last_sent: {100, 5}, last_flushed: {99, ...}} +# This assumes they've flushed everything before this transaction +prev_log_offset = %LogOffset{tx_offset: last_log_offset.tx_offset - 1} +``` + +--- + +## Consumer: Transaction Processing + +`Consumer` (`lib/electric/shapes/consumer.ex`) processes transactions for a single shape. + +### Current Flow: Full Transaction Accumulation + +Currently, transactions are fully accumulated in memory before writing: + +``` +TransactionFragment (no commit) + │ + ▼ +┌─────────────────────┐ +│ TransactionBuilder │ ← Accumulates in memory +│ (buffers fragments) │ +└─────────────────────┘ + │ +TransactionFragment (with commit) + │ + ▼ +┌─────────────────────┐ +│ Complete Transaction│ ← Now ready to process +└─────────────────────┘ + │ + ▼ +┌─────────────────────┐ +│ append_to_log! │ ← Write to storage +└─────────────────────┘ +``` + +### The txn_offset_mapping + +The Consumer maintains a mapping to align flush notifications with transaction boundaries: + +```elixir +# After processing a transaction: +txn_offset_mapping ++ [{last_log_offset, txn.last_log_offset}] +``` + +- **First element**: The shape's last written offset for this transaction +- **Second element**: The original transaction boundary + +This mapping handles the update-split edge case where the shape's last written offset might be `+1` from the original. + +### Offset Alignment on Flush + +When storage reports a flush: + +```elixir +def handle_info({ShapeCache.Storage, :flushed, offset}, state) do + {state, offset} = State.align_offset_to_txn_boundary(state, offset) + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset) +end +``` + +The `align_offset_to_txn_boundary/2` function: + +```elixir +def align_offset_to_txn_boundary(state, offset) do + case Enum.drop_while(txn_offset_mapping, &(compare(elem(&1, 0), offset) == :lt)) do + [{^offset, boundary} | rest] -> + # Flushed offset matches a transaction end - return the boundary + {%{state | txn_offset_mapping: rest}, boundary} + + rest -> + # Flushed mid-transaction - return raw offset + {%{state | txn_offset_mapping: rest}, offset} + end +end +``` + +If the flush happened mid-transaction, the raw offset is returned, and FlushTracker's `-1` safety margin handles it. + +--- + +## End-to-End Flow + +``` +PostgreSQL WAL + │ + ▼ +┌─────────────────────────────────────┐ +│ ShapeLogCollector │ +│ ├─ FlushTracker │ +│ │ └─ Tracks: last_seen_offset │ +│ │ last_global_flushed │ +│ │ per-shape {sent, │ +│ │ flushed} │ +│ └─ Dispatches txns to shapes │ +└──────────────┬──────────────────────┘ + │ TransactionFragment + ▼ +┌─────────────────────────────────────┐ +│ Shape Consumer │ +│ ├─ TransactionBuilder │ +│ │ └─ Accumulates fragments │ +│ ├─ Processes complete transactions │ +│ └─ Maintains txn_offset_mapping │ +└──────────────┬──────────────────────┘ + │ append_to_log! + ▼ +┌─────────────────────────────────────┐ +│ WriteLoop (PureFileStorage) │ +│ ├─ Buffers writes (64KB threshold) │ +│ ├─ Flushes to disk │ +│ └─ Tracks: last_seen_offset │ +│ last_persisted_offset │ +│ last_persisted_txn_offset│ +└──────────────┬──────────────────────┘ + │ {Storage, :flushed, offset} + ▼ +┌─────────────────────────────────────┐ +│ Consumer.handle_info │ +│ └─ align_offset_to_txn_boundary │ +└──────────────┬──────────────────────┘ + │ notify_flushed(shape, offset) + ▼ +┌─────────────────────────────────────┐ +│ FlushTracker │ +│ ├─ Updates shape's {sent, flushed} │ +│ ├─ Computes new global minimum │ +│ └─ Notifies Postgres (with -1 margin│ +│ if not fully caught up) │ +└─────────────────────────────────────┘ +``` + +--- + +## Design Considerations for Future Changes + +### Writing Transaction Fragments Directly to Storage + +If you want to persist fragments without accumulating complete transactions in memory: + +1. **Client reads are safe**: Readers use `last_persisted_txn_offset`, so updating only `last_persisted_offset` for fragments won't expose incomplete transactions. + +2. **Postgres acknowledgments are safe**: FlushTracker's `-1` margin handles mid-transaction notifications. + +3. **Implementation approach**: + - Update `last_persisted_offset` as each fragment is persisted + - Call `notify_flushed` with the persisted offset (FlushTracker handles it) + - Update `last_persisted_txn_offset` only when the Commit fragment is processed + +4. **FlushTracker consideration**: Currently `handle_txn_fragment/3` only processes fragments with a Commit: + ```elixir + def handle_txn_fragment(state, %TransactionFragment{commit: nil}, _) do + state # No-op + end + ``` + If you need FlushTracker to track in-flight fragments differently, this would need modification. + +### Key Invariants to Preserve + +1. **`last_persisted_txn_offset` must only advance on complete, persisted transactions** - this is the reader safety boundary. + +2. **FlushTracker notifications should be transaction-aligned when possible** - use the `-1` margin for mid-transaction flushes. + +3. **Offsets are preserved from the original transaction** - don't renumber them. The only adjustment is `+1` for key-changing updates that split into delete+insert. + +4. **The `min_incomplete_flush_tree` must stay consistent with `last_flushed` map** - always update both together. diff --git a/packages/sync-service/agent-docs/fragment-direct-txn-context.md b/packages/sync-service/agent-docs/fragment-direct-txn-context.md new file mode 100644 index 0000000000..894ade7945 --- /dev/null +++ b/packages/sync-service/agent-docs/fragment-direct-txn-context.md @@ -0,0 +1,279 @@ +# Fragment-Direct Transaction Handling in Electric + +## Overview + +Fragment-direct mode is an optimization for shapes that have no subquery dependencies. Instead of buffering entire transactions in memory before writing to storage, it writes each transaction fragment to storage immediately as it arrives, reducing memory usage. + +## Key Files + +| File | Purpose | +| -------------------------------------------------------------------------------- | ------------------------------------------------------- | +| `packages/sync-service/lib/electric/shapes/consumer.ex` | Main consumer logic, handles fragment-direct processing | +| `packages/sync-service/lib/electric/shapes/consumer/state.ex` | Consumer state, including `fragment_direct?` flag | +| `packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex` | Tracks in-flight transaction metadata | +| `packages/sync-service/lib/electric/shape_cache/storage.ex` | Storage behaviour and delegation | +| `packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex` | File-based storage implementation | +| `packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex` | Low-level write operations | +| `packages/sync-service/test/electric/shapes/consumer_test.exs` | Consumer tests including fragment-direct tests | +| `packages/sync-service/test/support/test_storage.ex` | Test wrapper for storage that sends messages | + +## When Fragment-Direct Mode is Enabled + +Fragment-direct mode is automatically enabled when ALL conditions are met (see `consumer.ex:476-481`): + +```elixir +defp can_use_fragment_direct?(state) do + state.fragment_direct? and # Shape has no subquery dependencies + not state.buffering? and # Not waiting for initial snapshot + not state.materializer_subscribed? and # No materializer subscribed + not needs_initial_filtering(state) # Initial snapshot filtering complete +end +``` + +The `fragment_direct?` flag is set during state initialization based on whether the shape has dependencies: + +```elixir +# In State.new/3 (state.ex:152-153) +fragment_direct?: shape.shape_dependencies == [] +``` + +## Transaction Fragment Structure + +From `lib/electric/replication/changes.ex:47-86`: + +```elixir +defstruct xid: nil, + lsn: nil, + last_log_offset: nil, + has_begin?: false, # true = start of transaction + commit: nil, # set on final fragment (contains %Commit{}) + changes: [], + affected_relations: MapSet.new(), + change_count: 0 +``` + +Fragment types: + +- **Full transaction**: `has_begin?: true` AND `commit` is set (single fragment contains entire txn) +- **Start fragment**: `has_begin?: true`, no `commit` +- **Middle fragment**: `has_begin?: false`, no `commit` +- **End fragment**: `has_begin?: false`, `commit` is set + +## Processing Flow + +### Decision Point (consumer.ex:455-468) + +```elixir +defp handle_event(%TransactionFragment{} = txn_fragment, state) do + if can_use_fragment_direct?(state) do + # FRAGMENT-DIRECT: Write fragments immediately to storage + handle_fragment_direct(txn_fragment, state) + else + # TRANSACTION BUILDER: Buffer fragments, build complete Transaction + {txns, transaction_builder} = + TransactionBuilder.build(txn_fragment, state.transaction_builder) + state = %{state | transaction_builder: transaction_builder} + handle_txns(txns, state) + end +end +``` + +### Fragment-Direct Processing (consumer.ex:485-698) + +1. **On BEGIN fragment** (`has_begin?: true`): + - `maybe_start_pending_txn/2` creates a `PendingTxn` struct to track transaction metadata + +2. **On each fragment with changes**: + - `write_fragment_to_storage/2` is called + - Changes are filtered through `Shape.convert_change/3` + - Log entries are prepared via `prepare_log_entries/3` + - **Key call**: `ShapeCache.Storage.append_fragment_to_log!/2` writes immediately + - `PendingTxn` is updated with last offset, change count, bytes written + +3. **On COMMIT fragment** (`commit != nil`): + - `write_fragment_to_storage/2` writes final changes + - `maybe_complete_pending_txn/2` is called: + - **Key call**: `ShapeCache.Storage.signal_txn_commit!/2` marks transaction complete + - Updates `last_seen_txn_offset` in storage (critical for crash recovery) + - Notifies clients of new changes + - Records telemetry metrics + +### Key Storage Functions + +**`append_fragment_to_log!/2`** (pure_file_storage.ex:1319, write_loop.ex:141): + +- Writes log lines to ETS buffer and potentially to disk +- Does **NOT** update `last_seen_txn_offset` +- Does **NOT** call `register_complete_txn` +- Data is stored but not yet "visible" via `get_log_stream` + +**`signal_txn_commit!/2`** (pure_file_storage.ex:1342, write_loop.ex:434): + +- Updates `last_seen_txn_offset` to mark transaction as complete +- Calls `register_complete_txn` to persist metadata +- After this, data becomes visible via `get_log_stream` + +## Crash Recovery Implications + +The separation between `append_fragment_to_log!` and `signal_txn_commit!` is intentional for crash safety: + +- `last_seen_txn_offset` is only updated on commit +- `get_log_stream` uses `last_seen_txn_offset` as an upper read bound +- If process crashes before commit, partial data is discarded on recovery +- `fetch_latest_offset` returns the last committed transaction offset + +## Data Visibility + +**Before `signal_txn_commit!`**: + +- Data is written to ETS buffer (`ets_line_buffer`) +- Data may be flushed to disk file +- Data is **NOT** visible via `Storage.get_log_stream()` +- `last_seen_txn_offset` still points to previous transaction + +**After `signal_txn_commit!`**: + +- `last_seen_txn_offset` is updated +- Data becomes visible via `Storage.get_log_stream()` +- Client notifications are sent + +## Comparison: Fragment-Direct vs TransactionBuilder Mode + +| Aspect | Fragment-Direct Mode | TransactionBuilder Mode | +| ------------------ | ---------------------------------------------------- | ------------------------------------------- | +| **When Used** | Simple shapes (no subquery deps) | Shapes with subquery deps, during buffering | +| **Memory Usage** | Lower - writes immediately | Higher - buffers entire transaction | +| **Storage Calls** | `append_fragment_to_log!` per fragment | Single `append_to_log!` on commit | +| **Write Timing** | Each fragment written immediately | All changes written on commit | +| **Crash Recovery** | Safe - `last_seen_txn_offset` only updated on commit | Safe - writes only on commit | + +## Testing Fragment-Direct Mode + +### Test File Location + +`packages/sync-service/test/electric/shapes/consumer_test.exs` + +The "fragment-direct streaming" describe block (starts around line 1108) contains tests for this functionality. + +### TestStorage Wrapper + +`test/support/test_storage.ex` provides a storage wrapper that sends messages to the test process when storage operations are called: + +```elixir +# Sends message when append_fragment_to_log! is called +def append_fragment_to_log!(log_items, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :append_fragment_to_log!, shape_handle, log_items}) + storage = Storage.append_fragment_to_log!(log_items, storage) + {parent, shape_handle, data, storage} +end + +# Sends message when signal_txn_commit! is called +def signal_txn_commit!(xid, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :signal_txn_commit!, shape_handle, xid}) + storage = Storage.signal_txn_commit!(xid, storage) + {parent, shape_handle, data, storage} +end +``` + +### Using TestStorage in Tests + +To use TestStorage, wrap the storage **before** starting the shape cache/consumers: + +```elixir +# In setup block, BEFORE with_shape_cache is called: +storage = Support.TestStorage.wrap(ctx.storage, %{}) +Electric.StackConfig.put(ctx.stack_id, Electric.ShapeCache.Storage, storage) + +# Then in test: +assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, lines} +assert_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, ^xid} +``` + +### Challenge: Verifying Fragment Writes Before Commit + +The key behavior to test is that `append_fragment_to_log!` is called for each fragment **before** the commit fragment is processed. However: + +1. `Storage.get_log_stream()` won't show uncommitted data (by design) +2. TestStorage must be configured **before** shape cache starts +3. The current "fragment-direct streaming" test setup uses `with_shape_cache` which initializes storage before individual tests run + +### Recommended Test Approach + +To properly test that fragments are written before commit: + +1. **Option A**: Create a new setup that wraps storage with TestStorage before `with_shape_cache` +2. **Option B**: Use a separate describe block with custom setup that: + - Calls `with_pure_file_storage` + - Wraps with `TestStorage.wrap(ctx.storage, %{})` + - Updates `StackConfig` with wrapped storage + - Then calls `with_shape_cache` + +Then assert the sequence of messages: + +```elixir +# After fragment1 +assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, frag1_lines} +refute_receive {Support.TestStorage, :signal_txn_commit!, _, _} + +# After fragment2 +assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, frag2_lines} +refute_receive {Support.TestStorage, :signal_txn_commit!, _, _} + +# After fragment3 (commit) +assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, frag3_lines} +assert_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, ^xid} +``` + +## Event Flow Diagram + +``` +ShapeLogCollector.handle_event(fragment, stack_id) + │ + ▼ +GenServer.call (synchronous) + │ + ▼ +ShapeLogCollector.do_handle_event(fragment) + │ + ▼ +ConsumerRegistry.publish (synchronous broadcast) + │ + ▼ +Consumer.handle_call({:handle_event, fragment, ctx}) + │ + ▼ +can_use_fragment_direct?(state) ─── false ──► TransactionBuilder path + │ + true + │ + ▼ +handle_fragment_direct(fragment, state) + │ + ├── maybe_start_pending_txn (if has_begin?) + │ + ├── write_fragment_to_storage + │ │ + │ ▼ + │ Storage.append_fragment_to_log!(lines, writer) + │ + └── maybe_complete_pending_txn (if commit != nil) + │ + ▼ + Storage.signal_txn_commit!(xid, writer) + │ + ▼ + notify_clients_of_new_changes() +``` + +## Important Code References + +- Fragment-direct decision: `consumer.ex:455-468` +- `can_use_fragment_direct?`: `consumer.ex:476-481` +- `handle_fragment_direct`: `consumer.ex:485-509` +- `write_fragment_to_storage`: `consumer.ex:533-598` +- `maybe_complete_pending_txn`: `consumer.ex:611-671` +- `append_fragment_to_log!` (Storage): `storage.ex:382-384` +- `append_fragment_to_log!` (WriteLoop): `write_loop.ex:141-178` +- `signal_txn_commit!` (Storage): `storage.ex:387-389` +- `signal_txn_commit!` (WriteLoop): `write_loop.ex:434-436` +- `PendingTxn` struct: `consumer/pending_txn.ex` diff --git a/packages/sync-service/lib/electric/postgres/xid.ex b/packages/sync-service/lib/electric/postgres/xid.ex index d4b0558e1f..d764ba2e0c 100644 --- a/packages/sync-service/lib/electric/postgres/xid.ex +++ b/packages/sync-service/lib/electric/postgres/xid.ex @@ -269,7 +269,8 @@ defmodule Electric.Postgres.Xid do @type pg_snapshot() :: {anyxid, anyxid, [anyxid]} @doc """ - Check if a transaction is after the end of a snapshot - if it's xid is over xmax + Check if a transaction is in the future from the POV of the snapshot. In other words, if its + xid is >= xmax, its changes are definitely *not* visible and *won't become* visible in this snapshot. """ @spec after_snapshot?(anyxid, pg_snapshot()) :: boolean() def after_snapshot?(xid, {_, xmax, _}) when not is_lt(xid, xmax), do: true diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index 740d138971..47715d0b75 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -318,8 +318,11 @@ defmodule Electric.Replication.ShapeLogCollector do def handle_cast({:writer_flushed, shape_id, offset}, state) do {:noreply, - state - |> Map.update!(:flush_tracker, &FlushTracker.handle_flush_notification(&1, shape_id, offset))} + Map.update!( + state, + :flush_tracker, + &FlushTracker.handle_flush_notification(&1, shape_id, offset) + )} end def handle_cast( diff --git a/packages/sync-service/lib/electric/replication/transaction_builder.ex b/packages/sync-service/lib/electric/replication/transaction_builder.ex index a73bfa175f..2ca5561d5e 100644 --- a/packages/sync-service/lib/electric/replication/transaction_builder.ex +++ b/packages/sync-service/lib/electric/replication/transaction_builder.ex @@ -35,18 +35,39 @@ defmodule Electric.Replication.TransactionBuilder do |> maybe_complete_transaction(fragment) end + @spec pop_incomplete_transaction_as_fragment(t()) :: {TransactionFragment.t() | nil, t()} + def pop_incomplete_transaction_as_fragment(%__MODULE__{transaction: nil}) do + {nil, new()} + end + + def pop_incomplete_transaction_as_fragment(%__MODULE__{transaction: txn}) do + txn = finalize_txn_changes(txn) + + fragment = + %TransactionFragment{ + has_begin?: true, + xid: txn.xid, + lsn: txn.lsn, + last_log_offset: txn.last_log_offset, + changes: txn.changes, + change_count: txn.num_changes + } + + {fragment, new()} + end + defp maybe_start_transaction(state, %TransactionFragment{has_begin?: false}), do: state defp maybe_start_transaction( %__MODULE__{} = state, - %TransactionFragment{xid: xid, has_begin?: true} + %TransactionFragment{has_begin?: true} = fragment ) do txn = %Transaction{ - xid: xid, + xid: fragment.xid, changes: [], commit_timestamp: nil, - lsn: nil, - last_log_offset: nil + lsn: fragment.lsn, + last_log_offset: fragment.last_log_offset } %{state | transaction: txn} @@ -79,12 +100,16 @@ defmodule Electric.Replication.TransactionBuilder do txn | lsn: lsn, commit_timestamp: commit.commit_timestamp, - changes: Enum.reverse(txn.changes), # The transaction may have had some changes filtered # out, so we need to set the last_log_offset from the fragment last_log_offset: last_log_offset } + |> finalize_txn_changes() + + {[completed_txn], new()} + end - {[completed_txn], %__MODULE__{transaction: nil}} + defp finalize_txn_changes(txn) do + %{txn | changes: Enum.reverse(txn.changes)} end end diff --git a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex index d0dcb9bc57..d56b609811 100644 --- a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex @@ -40,6 +40,8 @@ defmodule Electric.ShapeCache.CrashingFileStorage do defdelegate append_control_message!(control_message, writer_state), to: PureFileStorage defdelegate write_move_in_snapshot!(stream, name, opts), to: PureFileStorage + defdelegate append_fragment_to_log!(fragment_lines, state), to: PureFileStorage + defdelegate signal_txn_commit!(xid, state), to: PureFileStorage defp stack_agent_name(opts) do Electric.ProcessRegistry.name(opts, __MODULE__, :agent) diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index 70d1377279..f9b88451c3 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -318,6 +318,58 @@ defmodule Electric.ShapeCache.InMemoryStorage do opts end + @impl Electric.ShapeCache.Storage + def append_fragment_to_log!(log_items, %MS{} = opts) do + # For in-memory storage, fragment writes are similar to full transaction writes + # but we do NOT update @latest_offset_key until commit, and we do NOT send + # the :flushed message. This ensures fetch_latest_offset returns the last + # committed transaction offset, not a mid-transaction offset. + log_table = opts.log_table + chunk_checkpoint_table = opts.chunk_checkpoint_table + + {processed_log_items, _last_offset} = + Enum.map_reduce(log_items, nil, fn + {:chunk_boundary, offset}, curr -> + {{storage_offset(offset), :checkpoint}, curr} + + {offset, _key, _op_type, json_log_item}, _ -> + {{{:offset, storage_offset(offset)}, json_log_item}, offset} + end) + + processed_log_items + |> Enum.split_with(fn item -> match?({_, :checkpoint}, item) end) + |> then(fn {checkpoints, log_items} -> + :ets.insert(chunk_checkpoint_table, checkpoints) + :ets.insert(log_table, log_items) + # Note: NOT updating @latest_offset_key until signal_txn_commit! + end) + + opts + end + + @impl Electric.ShapeCache.Storage + def signal_txn_commit!(_xid, %MS{} = opts) do + # For in-memory storage, the commit signal updates @latest_offset_key + # to the last written offset and sends the :flushed message + log_table = opts.log_table + + # Find the last written offset in the log table + case :ets.last(log_table) do + :"$end_of_table" -> + opts + + {:offset, offset_tuple} -> + last_offset = LogOffset.new(offset_tuple) + :ets.insert(opts.snapshot_table, {@latest_offset_key, last_offset}) + send(self(), {Storage, :flushed, last_offset}) + opts + + _ -> + # Other keys (like :movein) - no update needed + opts + end + end + @impl Electric.ShapeCache.Storage def write_move_in_snapshot!(stream, name, %MS{log_table: log_table}) do stream diff --git a/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex index 90ca32e7a7..82e03bc514 100644 --- a/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex @@ -149,8 +149,8 @@ defmodule Electric.ShapeCache.PureFileStorage do end defp shape_data_dir(%__MODULE__{stack_id: stack_id, shape_handle: shape_handle}, suffix) do - {__MODULE__, stack_opts} = Storage.for_stack(stack_id) - shape_data_dir(stack_opts.base_path, shape_handle, suffix) + base_path = Storage.opt_for_stack(stack_id, :base_path) + shape_data_dir(base_path, shape_handle, suffix) end defp shape_data_dir(base_path, shape_handle, suffix \\ []) @@ -167,10 +167,7 @@ defmodule Electric.ShapeCache.PureFileStorage do defp shape_snapshot_dir(opts), do: shape_data_dir(opts, [@snapshot_dir]) defp shape_snapshot_path(opts, filename), do: shape_data_dir(opts, [@snapshot_dir, filename]) - defp tmp_dir(%__MODULE__{} = opts) do - {__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id) - stack_opts.tmp_dir - end + defp tmp_dir(%__MODULE__{} = opts), do: Storage.opt_for_stack(opts.stack_id, :tmp_dir) def stack_start_link(opts) do Supervisor.start_link( @@ -537,8 +534,9 @@ defmodule Electric.ShapeCache.PureFileStorage do ) if shape_definition.storage.compaction == :enabled do - {__MODULE__, stack_opts} = Storage.for_stack(shape_opts.stack_id) - schedule_compaction(stack_opts.compaction_config) + shape_opts.stack_id + |> Storage.opt_for_stack(:compaction_config) + |> schedule_compaction() end writer_state( @@ -1306,6 +1304,44 @@ defmodule Electric.ShapeCache.PureFileStorage do end end + @doc """ + Append log items from a transaction fragment. + + Unlike `append_to_log!/2`, this does NOT advance `last_seen_txn_offset` or + call `register_complete_txn`. Transaction completion should be signaled + separately via `signal_txn_commit!/2`. + + This ensures that on crash/recovery, `fetch_latest_offset` returns the + last committed transaction offset, not a mid-transaction offset. + """ + def append_fragment_to_log!(fragment_lines, writer_state(writer_acc: acc) = state) do + fragment_lines + |> normalize_log_stream() + |> WriteLoop.append_fragment_to_log!(acc, state) + |> case do + {acc, cancel_flush_timer: true} -> + timer_ref = writer_state(state, :write_timer) + if not is_nil(timer_ref), do: Process.cancel_timer(timer_ref) + writer_state(state, writer_acc: acc, write_timer: nil) + + {acc, schedule_flush: times_flushed} -> + writer_state(state, writer_acc: acc) + |> schedule_flush(times_flushed) + end + end + + @doc """ + Signal that a transaction has committed. + + Updates `last_seen_txn_offset` and persists metadata to mark the transaction + as complete. Should be called after all fragments have been written via + `append_fragment_to_log!/2`. + """ + def signal_txn_commit!(_xid, writer_state(writer_acc: acc) = state) do + acc = WriteLoop.signal_txn_commit(acc, state) + writer_state(state, writer_acc: acc) + end + def update_chunk_boundaries_cache(opts, boundaries) do :ets.update_element( opts.stack_ets, @@ -1320,13 +1356,13 @@ defmodule Electric.ShapeCache.PureFileStorage do if WriteLoop.has_flushed_since?(acc, old) or is_nil(timer) do if not is_nil(timer), do: Process.cancel_timer(timer) - {__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id) + flush_period = Storage.opt_for_stack(opts.stack_id, :flush_period) ref = Process.send_after( self(), {Storage, {__MODULE__, :perform_scheduled_flush, [WriteLoop.times_flushed(acc)]}}, - stack_opts.flush_period + flush_period ) writer_state(state, write_timer: ref) diff --git a/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex b/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex index d80c88f4ac..ef93ff31e4 100644 --- a/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex +++ b/packages/sync-service/lib/electric/shape_cache/pure_file_storage/write_loop.ex @@ -128,6 +128,55 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do end end + @doc """ + Append log lines from a transaction fragment. + + Unlike `append_to_log!/3`, this does NOT advance `last_seen_txn_offset` or + call `register_complete_txn`. Transaction completion should be signaled + separately via `register_complete_txn/2` after the commit is received. + + This ensures that on crash/recovery, `fetch_latest_offset` returns the + last committed transaction offset, not a mid-transaction offset. + """ + def append_fragment_to_log!( + fragment_lines, + writer_acc(times_flushed: times_flushed) = acc, + state + ) do + acc = ensure_json_file_open(acc, state) + + fragment_lines + |> Enum.reduce(acc, fn + {offset, _, _, _, _, _, _}, writer_acc(last_seen_txn_offset: min_offset) = acc + when is_log_offset_lte(offset, min_offset) -> + # Line already persisted, no-op + acc + + {offset, _, _, _, _, _, _} = line, acc -> + acc + |> maybe_write_opening_chunk_boundary(state, offset) + |> add_to_buffer(line) + |> maybe_write_closing_chunk_boundary(state) + |> maybe_flush_buffer(state) + end) + |> case do + # If the buffer has been fully flushed, no need to schedule more flushes + # Note: We do NOT update last_seen_txn_offset or call register_complete_txn + # since the transaction is not yet complete + writer_acc(buffer_size: 0) = acc -> + acc = close_chunk_file(acc) + {acc, cancel_flush_timer: true} + + acc -> + acc = + acc + |> store_lines_in_ets(state) + |> close_chunk_file() + + {acc, schedule_flush: times_flushed} + end + end + ### Working with the buffer defp add_to_buffer( @@ -374,4 +423,15 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do end |> update_persistance_metadata(state, prev_persisted_txn) end + + @doc """ + Signal that a transaction has been committed. + + This updates `last_seen_txn_offset` and potentially `last_persisted_txn_offset` + to mark the transaction as complete. Should be called after all fragments + have been written via `append_fragment_to_log!/3`. + """ + def signal_txn_commit(acc, state) do + register_complete_txn(acc, state) + end end diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index 4ca33422dc..de5e93e2b9 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -151,6 +151,29 @@ defmodule Electric.ShapeCache.Storage do @callback append_to_log!(Enumerable.t(log_item()), writer_state()) :: writer_state() | no_return() + @doc """ + Append log items from a transaction fragment. + + Called potentially multiple times per transaction for shapes that stream + fragments directly to storage without waiting for the complete transaction. + Unlike `append_to_log!/2`, this does not assume transaction completion. + + Transaction commits should be signaled separately via `signal_txn_commit!/2` + to allow storage to calculate chunk boundaries at transaction boundaries. + """ + @callback append_fragment_to_log!(Enumerable.t(log_item()), writer_state()) :: + writer_state() | no_return() + + @doc """ + Signal that a transaction has committed. + + Used by storage to calculate chunk boundaries at transaction boundaries. + Called after all fragments for a transaction have been written via + `append_fragment_to_log!/2`. + """ + @callback signal_txn_commit!(xid :: pos_integer(), writer_state()) :: + writer_state() | no_return() + @doc "Get stream of the log for a shape since a given offset" @callback get_log_stream(offset :: LogOffset.t(), max_offset :: LogOffset.t(), shape_opts()) :: log() @@ -211,6 +234,29 @@ defmodule Electric.ShapeCache.Storage do Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) end + # This function seamlessly unwraps TestStorage to give the storage implementation direct access to its options. + if Mix.env() != :test do + def opts_for_stack(stack_id) do + {_module, opts} = Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) + opts + end + else + def opts_for_stack(stack_id) do + case Electric.StackConfig.lookup!(stack_id, Electric.ShapeCache.Storage) do + {Support.TestStorage, {_parent_pid, _test_storage_init, {_storage_mod, storage_opts}}} -> + storage_opts + + {_storage_mod, storage_opts} -> + storage_opts + end + end + end + + def opt_for_stack(stack_id, opt_name) do + opts = opts_for_stack(stack_id) + Map.fetch!(opts, opt_name) + end + @spec child_spec(shape_storage()) :: Supervisor.child_spec() def child_spec({module, shape_opts}) do %{ @@ -355,6 +401,16 @@ defmodule Electric.ShapeCache.Storage do {mod, mod.append_to_log!(log_items, shape_opts)} end + @impl __MODULE__ + def append_fragment_to_log!(log_items, {mod, shape_opts}) do + {mod, mod.append_fragment_to_log!(log_items, shape_opts)} + end + + @impl __MODULE__ + def signal_txn_commit!(xid, {mod, shape_opts}) do + {mod, mod.signal_txn_commit!(xid, shape_opts)} + end + @impl __MODULE__ def get_log_stream(offset, max_offset \\ @last_log_offset, storage) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 8bc0109adf..05bc159f88 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -2,9 +2,10 @@ defmodule Electric.Shapes.Consumer do use GenServer, restart: :temporary alias Electric.Shapes.Consumer.ChangeHandling - alias Electric.Shapes.Consumer.MoveIns alias Electric.Shapes.Consumer.InitialSnapshot alias Electric.Shapes.Consumer.MoveHandling + alias Electric.Shapes.Consumer.MoveIns + alias Electric.Shapes.Consumer.PendingTxn alias Electric.Shapes.Consumer.State import Electric.Shapes.Consumer.State, only: :macros @@ -166,16 +167,13 @@ defmodule Electric.Shapes.Consumer do stop_and_clean(state) end - def handle_continue(:consume_buffer, %State{buffer: buffer} = state) do - Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transactions" end) - state = %{state | buffer: [], buffering?: false} - - case handle_txns(Enum.reverse(buffer), state) do - %State{terminating?: true} = state -> - {:noreply, state, {:continue, :stop_and_clean}} + def handle_continue(:consume_buffer, state) do + state = process_buffered_txn_fragments(state) - state -> - {:noreply, state, state.hibernate_after} + if state.terminating? do + {:noreply, state, {:continue, :stop_and_clean}} + else + {:noreply, state, state.hibernate_after} end end @@ -452,33 +450,261 @@ defmodule Electric.Shapes.Consumer do end defp handle_event(%TransactionFragment{} = txn_fragment, state) do + handle_txn_fragment(txn_fragment, state) + end + + # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). + # In this phase we have to buffer incoming txn fragments because we can't yet decide what to + # do with the transaction: skip it or write it to the shape log. + # + # When snapshot info arrives, `process_buffered_txn_fragments/1` will be called to process + # buffered fragments in order. + defp handle_txn_fragment( + %TransactionFragment{} = txn_fragment, + %State{buffering?: true} = state + ) do + State.add_to_buffer(state, txn_fragment) + end + + # The first fragment of a new transaction: initialize pending txn metadata and check if the + # transaction xid is already part of the snapshot. If it is, all subsequent fragments of this + # transaction will be ignored. + defp handle_txn_fragment( + %TransactionFragment{xid: xid} = txn_fragment, + %State{pending_txn: nil} = state + ) do + txn = PendingTxn.new(xid) + state = %{state | pending_txn: txn} + handle_txn_fragment(txn_fragment, state) + end + + # We can use the initial filtering to our advantage here and avoid accumulating fragments for + # a transaction that is going to be skipped anyway. This works both in the regular mode and + # in the fragment-direct mode. + defp handle_txn_fragment( + %TransactionFragment{has_begin?: true, xid: xid} = txn_fragment, + %State{} = state + ) + when needs_initial_filtering(state) do + case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, xid) do + {:consider_flushed, initial_snapshot_state} -> + # This transaction is already included in the snapshot, so just ignore all of its fragments. + # TODO: it could be the case the multiple txns are seen while InitialSnapshot needs + # filtering. And the outcomes for those txns could be different. + %{ + state + | pending_txn: PendingTxn.consider_flushed(state.pending_txn), + initial_snapshot_state: initial_snapshot_state + } + + {:continue, new_initial_snapshot_state} -> + # The transaction is not part of the initial snapshot. + state = %{state | initial_snapshot_state: new_initial_snapshot_state} + process_txn_fragment(txn_fragment, state) + end + end + + defp handle_txn_fragment( + %TransactionFragment{last_log_offset: last_log_offset} = txn_fragment, + %State{pending_txn: txn} = state + ) do + if txn.consider_flushed? or fragment_already_processed?(txn_fragment, state) do + # TODO: Do not mark the fragment itself flushed. When the cnnection restarts, we want to + # receive all fragments starting from the one with has_begin? so as not to overcomplicate + # the consumer logic. + consider_flushed(state, last_log_offset) + else + process_txn_fragment(txn_fragment, state) + end + end + + # In the regular mode all fragments are buffered until the Commit change is seen. At that + # point, a transaction struct is produced from the buffered fragments and is written to + # storage. + defp process_txn_fragment(txn_fragment, %State{fragment_direct?: false} = state) do {txns, transaction_builder} = TransactionBuilder.build(txn_fragment, state.transaction_builder) state = %{state | transaction_builder: transaction_builder} - handle_txns(txns, state) + + case txns do + [] -> state + [txn] -> handle_txn(txn, state) + end end - defp handle_txns(txns, %State{} = state), do: Enum.reduce_while(txns, state, &handle_txn/2) + # In the fragment-direct mode, each transaction fragment is written to storage individually. + defp process_txn_fragment(txn_fragment, state) do + state + |> write_txn_fragment_to_storage(txn_fragment) + |> maybe_complete_pending_txn(txn_fragment) + end - # Keep buffering for initial snapshot - defp handle_txn(txn, %State{buffering?: true} = state), - do: {:cont, State.add_to_buffer(state, txn)} + defp fragment_already_processed?(%TransactionFragment{last_log_offset: offset}, state) do + LogOffset.is_log_offset_lte(offset, state.latest_offset) + end - defp handle_txn(txn, state) when needs_initial_filtering(state) do - case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, txn) do - {:consider_flushed, initial_snapshot_state} -> - {:cont, consider_flushed(%{state | initial_snapshot_state: initial_snapshot_state}, txn)} + # This function does similar things to do_handle_txn/2 but with the following simplifications: + # - it doesn't account for move-ins or move-outs or converting update operations into insert/delete + # - the fragment is written directly to storage if it has changes matching this shape + # - if the fragment has a commit message, the ShapeLogCollector is informed about the new flush boundary + defp write_txn_fragment_to_storage(state, %TransactionFragment{changes: []}), do: state - {:continue, new_initial_snapshot_state} -> - handle_txn_in_span(txn, %{state | initial_snapshot_state: new_initial_snapshot_state}) + defp write_txn_fragment_to_storage( + state, + %TransactionFragment{changes: changes, commit: commit, xid: xid} = fragment + ) do + %{ + shape: shape, + writer: writer, + pending_txn: txn, + stack_id: stack_id, + shape_handle: shape_handle + } = state + + case convert_fragment_changes(changes, stack_id, shape_handle, shape) do + :includes_truncate -> + handle_txn_with_truncate(xid, state) + + {[], 0} -> + Logger.debug(fn -> + "No relevant changes found for #{inspect(shape)} in txn fragment of txn #{xid}" + end) + + {:cont, consider_flushed(state, fragment.last_log_offset)} + + {reversed_changes, num_changes, last_log_offset} -> + converted_changes = + reversed_changes + |> maybe_mark_last_change(commit) + |> Enum.reverse() + + {lines, total_size} = prepare_log_entries(converted_changes, xid, shape) + writer = ShapeCache.Storage.append_fragment_to_log!(lines, writer) + + txn = PendingTxn.update_stats(txn, last_log_offset, num_changes, total_size) + + Logger.debug(fn -> + "Wrote #{num_changes} changes for fragment xid=#{xid}, total_bytes=#{total_size}" + end) + + %{ + state + | writer: writer, + latest_offset: last_log_offset, + pending_txn: txn, + txn_offset_mapping: + state.txn_offset_mapping ++ [{last_log_offset, fragment.last_log_offset}] + } + end + end + + defp convert_fragment_changes(changes, stack_id, shape_handle, shape) do + Enum.reduce_while(changes, {[], 0}, fn + %Changes.TruncatedRelation{}, _acc -> + {:halt, :includes_truncate} + + change, {changes, count} = acc -> + # Apply Shape.convert_change to each change to: + # 1. Filter out changes not matching the shape's table + # 2. Apply WHERE clause filtering + case Shape.convert_change(shape, change, stack_id: stack_id, shape_handle: shape_handle) do + [] -> + {:cont, acc} + + [change] -> + {:cont, {[change | changes], count + 1}} + end + end) + |> case do + {[change | _] = changes, num_changes} -> + {changes, num_changes, LogItems.expected_offset_after_split(change)} + + acc -> + acc + end + end + + # Mark the last change in the list as last? when this is a commit fragment + # This is needed for clients to know when a transaction is complete + # The changes passed to this function are in reversed order, i.e. the last change is the head of the list. + defp maybe_mark_last_change([], _commit), do: [] + defp maybe_mark_last_change(changes, nil), do: changes + + defp maybe_mark_last_change(changes, _commit) do + [head | tail] = changes + [%{head | last?: true} | tail] + end + + defp maybe_complete_pending_txn(state, %TransactionFragment{commit: nil}), do: state + + defp maybe_complete_pending_txn(%{terminating?: true} = state, _fragment) do + # If we're terminating (e.g., due to truncate), don't complete the transaction + state + end + + defp maybe_complete_pending_txn(state, %TransactionFragment{commit: commit} = fragment) do + %{pending_txn: txn, writer: writer, shape: shape, shape_handle: shape_handle} = state + + # Signal commit to storage for potential chunk boundary calculation + writer = ShapeCache.Storage.signal_txn_commit!(txn.xid, writer) + + # Only notify if we actually wrote changes + if txn.num_changes > 0 do + # TODO: Only notify upon writing the full txn to storage + notify_new_changes(state, fragment.changes, txn.last_log_offset) + + lag = calculate_replication_lag_from_commit(commit) + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :storage, :transaction_stored], + %{ + bytes: txn.total_bytes, + count: 1, + operations: txn.num_changes, + replication_lag: lag + }, + Map.new(shape_attrs(shape_handle, shape)) + ) + + Logger.debug(fn -> + "Completed fragment-direct transaction xid=#{txn.xid}, changes=#{txn.num_changes}" + end) + + %{ + state + | writer: writer, + latest_offset: txn.last_log_offset, + pending_txn: nil, + txn_offset_mapping: + state.txn_offset_mapping ++ [{txn.last_log_offset, fragment.last_log_offset}] + } + else + # No changes were written - notify flush boundary like consider_flushed does + Logger.debug(fn -> + "No relevant changes in fragment-direct transaction xid=#{txn.xid}" + end) + + state = %{state | writer: writer, pending_txn: nil} + consider_flushed(state, fragment.last_log_offset) end end - # Remove the move-in buffering check - just process immediately - defp handle_txn(txn, state), do: handle_txn_in_span(txn, state) + defp calculate_replication_lag_from_commit(%Changes.Commit{commit_timestamp: nil}), do: 0 + + defp calculate_replication_lag_from_commit(%Changes.Commit{commit_timestamp: commit_timestamp}) do + now = DateTime.utc_now() + Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) + end + + def process_buffered_txn_fragments(%State{buffer: buffer} = state) do + Logger.debug(fn -> "Consumer catching up on #{length(buffer)} transaction fragments" end) + {txn_fragments, state} = State.pop_buffered(state) + # TODO: verify the "while" condition here, when will it terminate early? + Enum.reduce_while(txn_fragments, state, &handle_txn_fragment/2) + end - defp handle_txn_in_span(txn, %State{} = state) do + defp handle_txn(txn, %State{} = state) do ot_attrs = [xid: txn.xid, total_num_changes: txn.num_changes] ++ shape_attrs(state.shape_handle, state.shape) @@ -487,25 +713,12 @@ defmodule Electric.Shapes.Consumer do "shape_write.consumer.handle_txn", ot_attrs, state.stack_id, - fn -> - do_handle_txn(txn, state) - end + fn -> do_handle_txn(txn, state) end ) end - defp do_handle_txn(%Transaction{} = txn, state) - when LogOffset.is_log_offset_lte(txn.last_log_offset, state.latest_offset) do - Logger.debug(fn -> "Skipping already processed txn #{txn.xid}" end) - - {:cont, consider_flushed(state, txn)} - end - defp do_handle_txn(%Transaction{xid: xid, changes: changes} = txn, state) do - %{ - shape: shape, - shape_handle: shape_handle, - writer: writer - } = state + %{shape: shape, writer: writer} = state state = State.remove_completed_move_ins(state, txn) @@ -530,23 +743,14 @@ defmodule Electric.Shapes.Consumer do %{xid: xid, extra_refs: {extra_refs_before_move_ins, extra_refs_full}} ) do :includes_truncate -> - # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are - # present in the transaction, we're considering the whole transaction empty, and - # just rotate the shape handle. "Correct" way to handle truncates is to be designed. - Logger.warning( - "Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}" - ) - - state = mark_for_removal(state) - - {:halt, state} + {:halt, handle_txn_with_truncate(txn.xid, state)} {_, state, 0, _} -> Logger.debug(fn -> "No relevant changes found for #{inspect(shape)} in txn #{txn.xid}" end) - {:cont, consider_flushed(state, txn)} + {:cont, consider_flushed(state, txn.last_log_offset)} {changes, state, num_changes, last_log_offset} -> timestamp = System.monotonic_time() @@ -587,6 +791,17 @@ defmodule Electric.Shapes.Consumer do end end + defp handle_txn_with_truncate(xid, state) do + # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are + # present in the transaction, we're considering the whole transaction empty, and + # just rotate the shape handle. "Correct" way to handle truncates is to be designed. + Logger.warning( + "Truncate operation encountered while processing txn #{xid} for #{state.shape_handle}" + ) + + mark_for_removal(state) + end + defp notify_new_changes(state, nil), do: state defp notify_new_changes(state, {changes, upper_bound}) do @@ -685,14 +900,15 @@ defmodule Electric.Shapes.Consumer do Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) end - defp consider_flushed(%State{} = state, %Transaction{last_log_offset: new_boundary}) do + defp consider_flushed(%State{} = state, log_offset) do if state.txn_offset_mapping == [] do # No relevant txns have been observed and unflushed, we can notify immediately - ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, new_boundary) + ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, log_offset) state else # We're looking to "relabel" the next flush to include this txn, so we're looking for the # boundary that has a highest boundary less than this offset + new_boundary = log_offset {head, tail} = Enum.split_while( diff --git a/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex b/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex index c654d63d2e..ff4f1a4d1d 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex @@ -43,11 +43,16 @@ defmodule Electric.Shapes.Consumer.InitialSnapshot do def needs_buffering?(%__MODULE__{pg_snapshot: snapshot}), do: is_nil(snapshot) + def maybe_stop_initial_filtering(%__MODULE__{} = state, storage, %Transaction{xid: xid}) do + maybe_stop_initial_filtering(state, storage, xid) + end + def maybe_stop_initial_filtering( %__MODULE__{pg_snapshot: {xmin, xmax, xip_list} = snapshot} = state, storage, - %Transaction{xid: xid} - ) do + xid + ) + when is_integer(xid) and xid > 0 do if Xid.after_snapshot?(xid, snapshot) do Storage.set_pg_snapshot( %{xmin: xmin, xmax: xmax, xip_list: xip_list, filter_txns?: false}, @@ -89,11 +94,15 @@ defmodule Electric.Shapes.Consumer.InitialSnapshot do %{state | snapshot_started?: true} end - def filter(state, storage, %Transaction{} = txn) do - if Transaction.visible_in_snapshot?(txn, state.pg_snapshot) do + def filter(state, storage, %Transaction{xid: xid}) do + filter(state, storage, xid) + end + + def filter(state, storage, xid) when is_integer(xid) and xid > 0 do + if Transaction.visible_in_snapshot?(xid, state.pg_snapshot) do {:consider_flushed, state} else - state = maybe_stop_initial_filtering(state, storage, txn) + state = maybe_stop_initial_filtering(state, storage, xid) {:continue, state} end end diff --git a/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex b/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex new file mode 100644 index 0000000000..8a9881b8b0 --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/consumer/pending_txn.ex @@ -0,0 +1,58 @@ +defmodule Electric.Shapes.Consumer.PendingTxn do + @moduledoc """ + Tracks metadata for an in-progress transaction during fragment-direct streaming. + + When a consumer streams transaction fragments directly to storage (for shapes + without subquery dependencies), this struct tracks the transaction metadata + until commit is received. + + This is an antipod module to Electric.Replication.TransactionBuilder. This module only tracks + metadata related to the current transaction for which txn fragments are processed as the + fragments themselves are written to strorage and are discarded from memory immediately, while + the TransactionBuilder module accumulates all changes in memory and returns a complete + transaction after seeing a Commit. + """ + + alias Electric.Replication.LogOffset + + defstruct [ + :xid, + :last_log_offset, + consider_flushed?: false, + num_changes: 0, + total_bytes: 0 + ] + + @type t :: %__MODULE__{ + xid: pos_integer(), + last_log_offset: LogOffset.t() | nil, + consider_flushed?: boolean(), + num_changes: non_neg_integer(), + total_bytes: non_neg_integer() + } + + @doc """ + Create a new pending transaction tracker. + """ + @spec new(pos_integer()) :: t() + def new(xid) do + %__MODULE__{xid: xid} + end + + @doc """ + Update the pending transaction with changes that were written to storage. + """ + @spec update_stats(t(), LogOffset.t(), non_neg_integer(), non_neg_integer()) :: t() + def update_stats(%__MODULE__{} = pending_txn, log_offset, count, bytes) do + %{ + pending_txn + | last_log_offset: log_offset, + num_changes: pending_txn.num_changes + count, + total_bytes: pending_txn.total_bytes + bytes + } + end + + def consider_flushed(%__MODULE__{} = pending_txn) do + %{pending_txn | consider_flushed?: true} + end +end diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index ee3f517fb5..46badfb46b 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -3,7 +3,6 @@ defmodule Electric.Shapes.Consumer.State do alias Electric.Shapes.Consumer.MoveIns alias Electric.Shapes.Consumer.InitialSnapshot alias Electric.Shapes.Shape - alias Electric.Replication.Changes.Transaction alias Electric.Replication.Eval.Parser alias Electric.Replication.Eval.Walker alias Electric.Replication.TransactionBuilder @@ -30,7 +29,14 @@ defmodule Electric.Shapes.Consumer.State do terminating?: false, buffering?: false, or_with_subquery?: false, - not_with_subquery?: false + not_with_subquery?: false, + # Fragment-direct streaming fields + # When true, stream fragments directly to storage without buffering + fragment_direct?: false, + # Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen. + # It is used to check whether the entire txn is visible in the snapshot and to mark it + # as flushed in order to handle its remaining fragments appropriately. + pending_txn: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -143,7 +149,9 @@ defmodule Electric.Shapes.Consumer.State do ), buffering?: true, or_with_subquery?: has_or_with_subquery?(shape), - not_with_subquery?: has_not_with_subquery?(shape) + not_with_subquery?: has_not_with_subquery?(shape), + # Enable fragment-direct streaming for shapes without subquery dependencies + fragment_direct?: shape.shape_dependencies == [] } end @@ -241,11 +249,16 @@ defmodule Electric.Shapes.Consumer.State do end end - @spec add_to_buffer(t(), Transaction.t()) :: t() + @spec add_to_buffer(t(), TransactionFragment.t()) :: t() def add_to_buffer(%__MODULE__{buffer: buffer} = state, txn) do %{state | buffer: [txn | buffer]} end + @spec pop_buffered(t()) :: {[TransactionFragment.t()], t()} + def pop_buffered(%__MODULE__{buffer: buffer} = state) do + {Enum.reverse(buffer), %{state | buffer: [], buffering?: false}} + end + @spec add_waiter(t(), GenServer.from()) :: t() def add_waiter(%__MODULE__{initial_snapshot_state: initial_snapshot_state} = state, from) do %{ diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 81146562a1..d988d21e58 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -209,7 +209,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {^ref, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} + refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _}, 100 txn2 = transaction(xid, next_lsn, [ @@ -223,7 +223,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) assert_receive {^ref, :new_changes, ^next_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} + refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _}, 100 end test "correctly writes only relevant changes to multiple shape logs", ctx do @@ -272,9 +272,9 @@ defmodule Electric.Shapes.ConsumerTest do assert %{"value" => %{"id" => "1"}} = Jason.decode!(serialized_record) assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, - [{_offset, _key, _type, serialized_record}]} + [{_offset, _key, _type, serialized_record2}]} - assert %{"value" => %{"id" => "2"}} = Jason.decode!(serialized_record) + assert %{"value" => %{"id" => "2"}} = Jason.decode!(serialized_record2) end @tag shapes: %{ @@ -303,7 +303,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} + refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _}, 100 refute_receive {^ref1, :new_changes, _} assert_receive {^ref2, :new_changes, _} @@ -384,7 +384,7 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) end) - refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} + refute_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _}, 100 assert_shape_cleanup(@shape_handle1) @@ -569,8 +569,6 @@ defmodule Electric.Shapes.ConsumerTest do end describe "transaction handling with real storage" do - @describetag :tmp_dir - setup do %{inspector: @base_inspector, pool: nil} end @@ -600,12 +598,6 @@ defmodule Electric.Shapes.ConsumerTest do snapshot_fun.([]) end) - Electric.StackConfig.put( - ctx.stack_id, - :shape_hibernate_after, - Map.get(ctx, :hibernate_after, 10_000) - ) - %{consumer_supervisor: consumer_supervisor, shape_cache: shape_cache} = Support.ComponentSetup.with_shape_cache(ctx) @@ -1107,6 +1099,1133 @@ defmodule Electric.Shapes.ConsumerTest do end end + describe "fragment-direct streaming" do + setup do + %{inspector: @base_inspector, pool: nil} + end + + setup [ + :with_registry, + :with_pure_file_storage, + :with_shape_status, + :with_lsn_tracker, + # :with_log_chunking, + :with_persistent_kv, + :with_async_deleter, + :with_shape_cleaner, + :with_shape_log_collector, + :with_noop_publication_manager + # :with_status_monitor + ] + + setup(ctx) do + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + pg_snapshot = ctx[:pg_snapshot] || {10, 11, [10]} + GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) + GenServer.cast(parent, {:snapshot_started, shape_handle}) + snapshot_fun.([]) + end) + + storage = + Support.TestStorage.wrap(ctx.storage, %{}) + + # @shape_handle1 => [ + # {:mark_snapshot_as_started, []}, + # {:set_pg_snapshot, [%{xmin: 10, xmax: 11, xip_list: [10]}]} + # ] + # }) + + Electric.StackConfig.put(ctx.stack_id, Electric.ShapeCache.Storage, storage) + + %{consumer_supervisor: consumer_supervisor, shape_cache: shape_cache} = + Support.ComponentSetup.with_shape_cache(ctx) + + [ + consumer_supervisor: consumer_supervisor, + shape_cache: shape_cache, + storage: storage + ] + end + + test "multi-fragment transaction writes each fragment to storage before commit", ctx do + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + xid = 50 + + # First fragment with begin, no commit + fragment1 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 1), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Second fragment - middle, no begin, no commit + fragment2 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 3), + has_begin?: false, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "3"}, + log_offset: LogOffset.new(lsn, 2) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "4"}, + log_offset: LogOffset.new(lsn, 3) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Third fragment with commit + fragment3 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 5), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "5"}, + log_offset: LogOffset.new(lsn, 4) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "6"}, + log_offset: LogOffset.new(lsn, 5) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Await snapshot completion message from the patched snapshotter fun. + assert_receive {:snapshot, ^shape_handle} + + # Consume the init_writer! and for_shape messages from TestStorage setup + assert_receive {Support.TestStorage, :for_shape, ^shape_handle} + assert_receive {Support.TestStorage, :init_writer!, ^shape_handle, _shape} + + # Send first fragment (begin) and verify it's written to storage immediately + assert :ok = ShapeLogCollector.handle_event(fragment1, ctx.stack_id) + + # Verify append_fragment_to_log! was called BEFORE commit (key fragment-direct behavior) + assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, frag1_lines}, + @receive_timeout + + assert length(frag1_lines) == 2, + "First fragment should write 2 log entries, got #{length(frag1_lines)}" + + # Verify signal_txn_commit! was NOT called yet + refute_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, _xid}, 100 + + # No client notification yet - transaction not committed + refute_receive {^ref, :new_changes, _}, 100 + + # Send second fragment (middle) + assert :ok = ShapeLogCollector.handle_event(fragment2, ctx.stack_id) + + # Verify append_fragment_to_log! was called for second fragment BEFORE commit + assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, frag2_lines}, + @receive_timeout + + assert length(frag2_lines) == 2, + "Second fragment should write 2 log entries, got #{length(frag2_lines)}" + + # Verify signal_txn_commit! still NOT called + refute_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, _xid}, 100 + + # Still no client notification + refute_receive {^ref, :new_changes, _}, 100 + + # Send third fragment (commit) + assert :ok = ShapeLogCollector.handle_event(fragment3, ctx.stack_id) + + # Verify append_fragment_to_log! was called for third fragment + assert_receive {Support.TestStorage, :append_fragment_to_log!, ^shape_handle, frag3_lines}, + @receive_timeout + + assert length(frag3_lines) == 2, + "Third fragment should write 2 log entries, got #{length(frag3_lines)}" + + # NOW signal_txn_commit! should be called (only after commit fragment) + assert_receive {Support.TestStorage, :signal_txn_commit!, ^shape_handle, ^xid}, + @receive_timeout + + # Now we should get client notification + assert_receive {^ref, :new_changes, offset}, @receive_timeout + assert offset == LogOffset.new(lsn, 5) + + # Verify all 6 records were written (final state verification) + shape_storage = Storage.for_shape(shape_handle, ctx.storage) + + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 6 + assert Enum.map(records, & &1["value"]["id"]) == ["1", "2", "3", "4", "5", "6"] + end + + test "empty transaction (no relevant changes) notifies flush boundary", ctx do + {:via, Registry, {name, key}} = Electric.Postgres.ReplicationClient.name(ctx.stack_id) + Registry.register(name, key, nil) + + # Use shape3 which has where: "id = 1" - only matches id=1 + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape3, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + + # Transaction with changes that don't match the shape's where clause + txn = + transaction(50, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "999"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Should NOT receive new_changes since no relevant changes + refute_receive {^ref, :new_changes, _}, 200 + + # But should receive flush boundary notification + assert_receive {:flush_boundary_updated, 100}, @receive_timeout + + stop_supervised!(ctx.consumer_supervisor) + end + + test "truncate operation in fragment triggers shape removal", ctx do + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + consumer_ref = Process.monitor(consumer_pid) + + lsn = Lsn.from_integer(100) + + # Transaction with truncate + txn = %TransactionFragment{ + xid: 50, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 0), + has_begin?: true, + commit: %Commit{}, + changes: [ + %Changes.TruncatedRelation{ + relation: {"public", "test_table"}, + log_offset: LogOffset.new(lsn, 0) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Consumer should terminate due to truncate + assert_receive {:DOWN, ^consumer_ref, :process, ^consumer_pid, {:shutdown, :cleanup}}, + @receive_timeout + + # Wait for shape cleanup to be triggered + assert_shape_cleanup(shape_handle) + + stop_supervised!(ctx.consumer_supervisor) + end + + @tag with_pure_file_storage_opts: [flush_period: 50] + test "skipped fragments during recovery still notify flush boundary", ctx do + {:via, Registry, {name, key}} = Electric.Postgres.ReplicationClient.name(ctx.stack_id) + Registry.register(name, key, nil) + + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn1 = Lsn.from_integer(100) + lsn2 = Lsn.from_integer(200) + + # First transaction - will be processed + txn1 = + transaction(50, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + assert_receive {:flush_boundary_updated, 100}, @receive_timeout + + # Second transaction + txn2 = + transaction(51, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn2, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + assert_receive {:flush_boundary_updated, 200}, @receive_timeout + + # Now send txn1 again (simulating recovery replay) - should be skipped + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + + # Should NOT get new_changes for already processed transaction + refute_receive {^ref, :new_changes, _}, 200 + + # Verify storage still has only 2 records + shape_storage = Storage.for_shape(shape_handle, storage) + + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 2 + + stop_supervised!(ctx.consumer_supervisor) + end + + test "shapes with subquery dependencies use TransactionBuilder (not fragment-direct)", ctx do + # This test verifies that shapes with dependencies don't use fragment-direct mode + # by checking that inner shapes correctly track materializer subscriptions + + {shape_handle, _} = + ShapeCache.get_or_create_shape_handle(@shape_with_subquery, ctx.stack_id) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + {:ok, shape} = Electric.Shapes.fetch_shape_by_handle(ctx.stack_id, shape_handle) + [dep_handle] = shape.shape_dependencies_handles + + # The dependency (inner) shape should exist + dep_consumer = Consumer.whereis(ctx.stack_id, dep_handle) + assert is_pid(dep_consumer) + + # The outer shape should exist + outer_consumer = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(outer_consumer) + + _ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + + # Send a transaction to the dependency table + txn = + transaction(50, lsn, [ + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # The inner shape should process it (it tracks other_table) + # We can't easily verify fragment-direct vs TransactionBuilder internally, + # but we can verify the shapes work correctly together + # The outer shape won't get changes until move-in processing completes + + stop_supervised!(ctx.consumer_supervisor) + end + + @tag pg_snapshot: {10, 15, [12]}, with_pure_file_storage_opts: [flush_period: 50] + test "fragment-direct mode disabled during initial filtering phase", ctx do + {:via, Registry, {name, key}} = Electric.Postgres.ReplicationClient.name(ctx.stack_id) + Registry.register(name, key, nil) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + # Transaction with xid=2 (< xmin=10) should be considered "already visible" + # and filtered out during initial filtering phase + lsn1 = Lsn.from_integer(100) + lsn2 = Lsn.from_integer(101) + + txn1 = + transaction(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + # Transaction with xid=20 (> xmax=15) should be processed normally + txn2 = + transaction(20, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn2, 0) + } + ]) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + # txn1 is filtered (xid < xmin), so should notify flush but no new_changes + assert_receive {:flush_boundary_updated, 100}, @receive_timeout + refute_receive {^ref, :new_changes, _}, 100 + + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + # txn2 should be processed normally + assert_receive {^ref, :new_changes, _}, @receive_timeout + assert_receive {:flush_boundary_updated, 101}, @receive_timeout + + stop_supervised!(ctx.consumer_supervisor) + end + + test "fragment with changes for different table is filtered out", ctx do + {:via, Registry, {name, key}} = Electric.Postgres.ReplicationClient.name(ctx.stack_id) + Registry.register(name, key, nil) + + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + + # Transaction with changes for a different table (other_table, not test_table) + txn = + transaction(50, lsn, [ + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Should NOT receive new_changes since changes are for different table + refute_receive {^ref, :new_changes, _}, 200 + + # But should receive flush boundary notification + assert_receive {:flush_boundary_updated, 100}, @receive_timeout + + # Verify no records were written + shape_storage = Storage.for_shape(shape_handle, storage) + + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert records == [] + + stop_supervised!(ctx.consumer_supervisor) + end + + test "mixed fragment with some relevant and some irrelevant changes", ctx do + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + + # Transaction with mixed changes - some for test_table, some for other_table + txn = %TransactionFragment{ + xid: 50, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 2), + has_begin?: true, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "other_table"}, + record: %{"id" => "999"}, + log_offset: LogOffset.new(lsn, 1) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 2) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}, {"public", "other_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Should receive new_changes for the relevant changes + assert_receive {^ref, :new_changes, offset}, @receive_timeout + # The offset should be from the last relevant change (id=2 at offset 2) + assert offset == LogOffset.new(lsn, 2) + + # Verify only relevant records were written (2 records for test_table) + shape_storage = Storage.for_shape(shape_handle, storage) + + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 2 + ids = Enum.map(records, & &1["value"]["id"]) + assert ids == ["1", "2"] + + stop_supervised!(ctx.consumer_supervisor) + end + + # Use a pg_snapshot where filtering is disabled (xid > xmax for all transactions) + # so that fragment-direct mode is used + @tag pg_snapshot: {10, 11, []} + test "interleaved begin fragments raise an error", ctx do + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + # First, send a transaction with high xid to complete the initial filtering phase + # This is needed because filtering? starts as true and is only disabled after + # processing a transaction with xid > xmax + lsn_init = Lsn.from_integer(50) + + init_txn = + transaction(100, lsn_init, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "init"}, + log_offset: LogOffset.new(lsn_init, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(init_txn, ctx.stack_id) + + # Wait for processing + Process.sleep(50) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + consumer_ref = Process.monitor(consumer_pid) + + lsn1 = Lsn.from_integer(100) + lsn2 = Lsn.from_integer(200) + + # First fragment with begin for xid=50 + fragment1 = %TransactionFragment{ + xid: 50, + lsn: lsn1, + last_log_offset: LogOffset.new(lsn1, 0), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn1, 0) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Second fragment with begin for different xid=60 while xid=50 is pending + fragment2 = %TransactionFragment{ + xid: 60, + lsn: lsn2, + last_log_offset: LogOffset.new(lsn2, 0), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn2, 0) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Send first fragment + assert :ok = ShapeLogCollector.handle_event(fragment1, ctx.stack_id) + + # Consumer should still be alive + assert Process.alive?(consumer_pid) + + # Send second fragment with different xid - should cause crash + assert :ok = ShapeLogCollector.handle_event(fragment2, ctx.stack_id) + + # Consumer should crash with the interleaved error + assert_receive {:DOWN, ^consumer_ref, :process, ^consumer_pid, reason}, @receive_timeout + + assert {%RuntimeError{message: message}, _stacktrace} = reason + assert message =~ "unexpected_interleaved_txns" + assert message =~ "xid=60" + assert message =~ "xid=50" + + stop_supervised!(ctx.consumer_supervisor) + end + + @tag with_pure_file_storage_opts: [flush_period: 10] + test "crash/restart with partial fragments persisted recovers correctly", ctx do + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + # Small delay to ensure writer is fully initialized + Process.sleep(20) + + lsn = Lsn.from_integer(100) + xid = 50 + + # Get the initial offset after snapshot (this will be the snapshot end offset) + shape_storage = Storage.for_shape(shape_handle, storage) + {:ok, initial_offset} = Storage.fetch_latest_offset(shape_storage) + + # The initial offset should be the snapshot end (LogOffset.new(0, 0) for empty snapshot) + # since no transactions have been committed yet + assert initial_offset == LogOffset.new(0, 0) + + # First fragment with begin, no commit + fragment1 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 1), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Send the first fragment + assert :ok = ShapeLogCollector.handle_event(fragment1, ctx.stack_id) + + # Wait for flush (short flush period) + Process.sleep(50) + + # Check that fetch_latest_offset returns the last COMMITTED offset + # not the fragment's offset, since the transaction is not yet committed + {:ok, latest_offset} = Storage.fetch_latest_offset(shape_storage) + + # The latest offset should still be the initial offset (snapshot end) + # since no transaction has committed yet + assert latest_offset == initial_offset + + # Now send the commit fragment + fragment2 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 2), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "3"}, + log_offset: LogOffset.new(lsn, 2) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(fragment2, ctx.stack_id) + + # Wait for flush + Process.sleep(50) + + # Now the latest offset should be updated to the committed transaction + {:ok, latest_offset_after_commit} = Storage.fetch_latest_offset(shape_storage) + assert latest_offset_after_commit == LogOffset.new(lsn, 2) + + # Verify all records were written + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 3 + ids = Enum.map(records, & &1["value"]["id"]) + assert ids == ["1", "2", "3"] + + stop_supervised!(ctx.consumer_supervisor) + end + + @tag with_pure_file_storage_opts: [flush_period: 50] + test "commit-only fragment with no relevant changes still signals commit", ctx do + {:via, Registry, {name, key}} = Electric.Postgres.ReplicationClient.name(ctx.stack_id) + Registry.register(name, key, nil) + + # Use shape3 which has where: "id = 1" - only matches id=1 + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape3, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + xid = 50 + + # First fragment with relevant change (id=1 matches where clause) + fragment1 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 0), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Second fragment with commit but irrelevant change (id=999 doesn't match) + fragment2 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 1), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "999"}, + log_offset: LogOffset.new(lsn, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Send first fragment - should not notify yet + assert :ok = ShapeLogCollector.handle_event(fragment1, ctx.stack_id) + refute_receive {^ref, :new_changes, _}, 100 + + # Send commit fragment - should now notify + assert :ok = ShapeLogCollector.handle_event(fragment2, ctx.stack_id) + assert_receive {^ref, :new_changes, offset}, @receive_timeout + assert offset == LogOffset.new(lsn, 0) + + # Flush boundary should be updated + assert_receive {:flush_boundary_updated, 100}, @receive_timeout + + stop_supervised!(ctx.consumer_supervisor) + end + + @tag with_pure_file_storage_opts: [flush_period: 10] + test "flush-before-commit does not advance flush boundary beyond last committed offset", + ctx do + {:via, Registry, {name, key}} = Electric.Postgres.ReplicationClient.name(ctx.stack_id) + Registry.register(name, key, nil) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + lsn = Lsn.from_integer(100) + xid = 50 + + # First fragment with begin, no commit + fragment1 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 1), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Send the first fragment + assert :ok = ShapeLogCollector.handle_event(fragment1, ctx.stack_id) + + # Wait for flush to happen (short flush period) + Process.sleep(50) + + # Should NOT receive flush_boundary_updated with the fragment's offset + # because the transaction is not yet committed + refute_receive {:flush_boundary_updated, 100}, 100 + + # Now send the commit fragment + fragment2 = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 2), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "3"}, + log_offset: LogOffset.new(lsn, 2) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(fragment2, ctx.stack_id) + + # Now we should receive the flush boundary update + assert_receive {:flush_boundary_updated, 100}, @receive_timeout + + stop_supervised!(ctx.consumer_supervisor) + end + + # P1.1: Test pending_txn: nil edge cases + # These simulate recovery scenarios where the consumer restarts mid-transaction + # Use pg_snapshot with empty xip_list so filtering can be disabled + + @tag pg_snapshot: {10, 11, []} + test "mid-fragment without prior begin creates pending_txn on-the-fly", ctx do + # This simulates a recovery scenario where: + # 1. Begin fragment was processed before crash + # 2. Consumer restarts and receives middle fragment + # The consumer should create a pending_txn on-the-fly and process normally + + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + # First, send a transaction with high xid to complete the initial filtering phase + # This is needed because filtering? starts as true and is only disabled after + # processing a transaction with xid > xmax + lsn_init = Lsn.from_integer(50) + + init_txn = + transaction(100, lsn_init, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "init"}, + log_offset: LogOffset.new(lsn_init, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(init_txn, ctx.stack_id) + Process.sleep(50) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + xid = 200 + + # Send a middle fragment directly (no begin) - simulates recovery after begin was processed + mid_fragment = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 1), + has_begin?: false, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Should process without error + assert :ok = ShapeLogCollector.handle_event(mid_fragment, ctx.stack_id) + + # No notification yet - transaction not committed + refute_receive {^ref, :new_changes, _}, 100 + + # Now send commit fragment + commit_fragment = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 2), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "3"}, + log_offset: LogOffset.new(lsn, 2) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) + + # Now we should get notification + assert_receive {^ref, :new_changes, offset}, @receive_timeout + assert offset == LogOffset.new(lsn, 2) + + # Verify all 3 records were written (plus 1 from init_txn) + shape_storage = Storage.for_shape(shape_handle, storage) + + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 4 + ids = Enum.map(records, & &1["value"]["id"]) + assert ids == ["init", "1", "2", "3"] + + stop_supervised!(ctx.consumer_supervisor) + end + + @tag pg_snapshot: {10, 11, []} + test "commit-only fragment with no prior fragments processes correctly", ctx do + # This simulates a recovery scenario where: + # 1. All prior fragments were processed before crash + # 2. Consumer restarts and receives only the commit fragment + # The consumer should create a pending_txn on-the-fly and complete the transaction + + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + # First, send a transaction with high xid to complete the initial filtering phase + lsn_init = Lsn.from_integer(50) + + init_txn = + transaction(100, lsn_init, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "init"}, + log_offset: LogOffset.new(lsn_init, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(init_txn, ctx.stack_id) + Process.sleep(50) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + lsn = Lsn.from_integer(100) + xid = 200 + + # Send only a commit fragment (no begin, has commit) with changes + commit_only_fragment = %TransactionFragment{ + xid: xid, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 1), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2"}, + log_offset: LogOffset.new(lsn, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + # Should process without error and complete the transaction + assert :ok = ShapeLogCollector.handle_event(commit_only_fragment, ctx.stack_id) + + # Should get notification since transaction is complete + assert_receive {^ref, :new_changes, offset}, @receive_timeout + assert offset == LogOffset.new(lsn, 1) + + # Verify records were written (plus 1 from init_txn) + shape_storage = Storage.for_shape(shape_handle, storage) + + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 3 + ids = Enum.map(records, & &1["value"]["id"]) + assert ids == ["init", "1", "2"] + + stop_supervised!(ctx.consumer_supervisor) + end + + # P1.2: Test that uncommitted fragments don't advance the durable offset + # This verifies the core invariant for crash recovery safety + + @tag pg_snapshot: {10, 11, []}, with_pure_file_storage_opts: [flush_period: 10] + test "uncommitted fragments flushed to disk do not advance latest_offset", ctx do + # This test verifies the key invariant for crash safety: + # 1. Uncommitted fragment data can be written and flushed to disk + # 2. fetch_latest_offset still returns the last committed offset + # 3. Completing the transaction advances the offset correctly + + %{storage: storage} = ctx + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + # First, send a transaction with high xid to complete the initial filtering phase + lsn_init = Lsn.from_integer(50) + + init_txn = + transaction(100, lsn_init, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "committed-first"}, + log_offset: LogOffset.new(lsn_init, 0) + } + ]) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + assert :ok = ShapeLogCollector.handle_event(init_txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + # Wait for flush + Process.sleep(50) + + # Get the offset after the committed transaction + shape_storage = Storage.for_shape(shape_handle, storage) + {:ok, offset_after_committed} = Storage.fetch_latest_offset(shape_storage) + assert offset_after_committed == LogOffset.new(lsn_init, 0) + + # Now send an uncommitted fragment (begin, no commit) + lsn_uncommitted = Lsn.from_integer(100) + xid_uncommitted = 200 + + uncommitted_fragment = %TransactionFragment{ + xid: xid_uncommitted, + lsn: lsn_uncommitted, + last_log_offset: LogOffset.new(lsn_uncommitted, 1), + has_begin?: true, + commit: nil, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "uncommitted-1"}, + log_offset: LogOffset.new(lsn_uncommitted, 0) + }, + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "uncommitted-2"}, + log_offset: LogOffset.new(lsn_uncommitted, 1) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(uncommitted_fragment, ctx.stack_id) + + # No notification - transaction not committed + refute_receive {^ref, :new_changes, _}, 100 + + # Wait for flush to disk (short flush_period ensures this happens) + Process.sleep(50) + + # KEY ASSERTION: fetch_latest_offset still returns the committed offset + # even though uncommitted fragment data has been flushed to disk + {:ok, offset_after_flush} = Storage.fetch_latest_offset(shape_storage) + + assert offset_after_flush == offset_after_committed, + "fetch_latest_offset should return #{inspect(offset_after_committed)} but got #{inspect(offset_after_flush)}" + + # Now send the commit fragment to complete the transaction + commit_fragment = %TransactionFragment{ + xid: xid_uncommitted, + lsn: lsn_uncommitted, + last_log_offset: LogOffset.new(lsn_uncommitted, 2), + has_begin?: false, + commit: %Commit{}, + changes: [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "now-committed"}, + log_offset: LogOffset.new(lsn_uncommitted, 2) + } + ], + affected_relations: MapSet.new([{"public", "test_table"}]) + } + + assert :ok = ShapeLogCollector.handle_event(commit_fragment, ctx.stack_id) + + # Now we should get notification + assert_receive {^ref, :new_changes, offset}, @receive_timeout + assert offset == LogOffset.new(lsn_uncommitted, 2) + + # Wait for flush + Process.sleep(50) + + # After commit, fetch_latest_offset should return the new committed offset + {:ok, offset_after_commit} = Storage.fetch_latest_offset(shape_storage) + assert offset_after_commit == LogOffset.new(lsn_uncommitted, 2) + + # Verify all records are in storage + records = + Storage.get_log_stream(LogOffset.last_before_real_offsets(), shape_storage) + |> Enum.map(&Jason.decode!/1) + + assert length(records) == 4 + ids = Enum.map(records, & &1["value"]["id"]) + assert ids == ["committed-first", "uncommitted-1", "uncommitted-2", "now-committed"] + + stop_supervised!(ctx.consumer_supervisor) + end + end + defp transaction(xid, lsn, changes) do [%{log_offset: last_log_offset} | _] = Enum.reverse(changes) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index a52fce7731..d40f5f369d 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -109,7 +109,7 @@ defmodule Support.ComponentSetup do inspector: Map.get(ctx, :inspector, nil), shape_changes_registry: Map.get(ctx, :registry, Electric.StackSupervisor.registry_name(stack_id)), - shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000), + shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 100_000), shape_enable_suspend?: Map.get(ctx, :suspend, false), feature_flags: Electric.Config.get_env(:feature_flags), process_spawn_opts: Map.get(ctx, :process_spawn_opts, %{}) diff --git a/packages/sync-service/test/support/test_storage.ex b/packages/sync-service/test/support/test_storage.ex index 206f6c8db2..a023adcd97 100644 --- a/packages/sync-service/test/support/test_storage.ex +++ b/packages/sync-service/test/support/test_storage.ex @@ -150,6 +150,20 @@ defmodule Support.TestStorage do {parent, shape_handle, data, storage} end + @impl Electric.ShapeCache.Storage + def append_fragment_to_log!(log_items, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :append_fragment_to_log!, shape_handle, log_items}) + storage = Storage.append_fragment_to_log!(log_items, storage) + {parent, shape_handle, data, storage} + end + + @impl Electric.ShapeCache.Storage + def signal_txn_commit!(xid, {parent, shape_handle, data, storage}) do + send(parent, {__MODULE__, :signal_txn_commit!, shape_handle, xid}) + storage = Storage.signal_txn_commit!(xid, storage) + {parent, shape_handle, data, storage} + end + @impl Electric.ShapeCache.Storage def append_move_in_snapshot_to_log!(name, {parent, shape_handle, data, storage}) do send(parent, {__MODULE__, :append_move_in_snapshot_to_log!, shape_handle, name})