Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c19c68b
Clarify the explanatory comment on Xid.after_snapshot?()
alco Jan 27, 2026
92f4c31
WIP Consumer handle_txn_fragment
alco Jan 27, 2026
678a154
Buffer transaction fragments directly, not waiting for a complete tra…
alco Jan 27, 2026
6828bbe
wip Truly write invididual txn fragments to storage
alco Jan 27, 2026
d9708df
Augment Storage API with the ability to write txn fragments
alco Jan 27, 2026
958c4a6
Implement txn fragment handling in PureFileStorage
alco Jan 27, 2026
c0ae18a
WIP Add fragment awareness to test storage
alco Jan 27, 2026
5c4968a
WIP Txn fragments for inmemorystorage which is used in consumer tests
alco Jan 27, 2026
5d65145
Forgot that a txn fragment with has_begin? has a list of changes
alco Jan 27, 2026
75b68a8
Do not forget to reset state.pending_txn when skipping the whole tran…
alco Jan 27, 2026
e4afc93
WIP don't forget to reset pending_txn when fragment_direct?=false
alco Jan 27, 2026
a67d863
Remove continuation tags from handle_txn since it's no longer called …
alco Jan 27, 2026
a1d8cb7
Pass persisted fragment changes to the pending txn completion function
alco Jan 27, 2026
dc4bb9b
Remove unused CrashingFileStorage module and related config
alco Jan 12, 2026
376fbd4
Reset pending_txn when a full transaction is assembled in shape with …
alco Jan 27, 2026
26b0c7c
Log txn fragment and txn reception separately in Consumer
alco Jan 27, 2026
bfa3a69
Short-circuit handling of txn fragments that contain entire transactions
alco Jan 28, 2026
39f76f6
Update consumer tests to cover fragment-aware storage function calls
alco Jan 28, 2026
1c882c2
wip assert_storage_calls_for_txn_fragment
alco Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-tests/tests/crash-recovery.lux
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
# slower than the pure-ets version (~50ms vs ~5ms) which means that this request
# arrives before the stack has received the pending operation.
# To mitigate this we ensure the txn has been processed.
??[debug] Txn received in Shapes.Consumer
??[debug] Txn fragment received in Shapes.Consumer

# Client should be able to continue same shape
[shell client]
Expand Down
7 changes: 0 additions & 7 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,6 @@ storage_spec =
"fast_file" ->
{Electric.ShapeCache.PureFileStorage, storage_dir: shape_path}

"crashing_file" ->
num_calls_until_crash =
env!("CRASHING_FILE_ELECTRIC_STORAGE__NUM_CALLS_UNTIL_CRASH", :integer)

{Electric.ShapeCache.CrashingFileStorage,
storage_dir: shape_path, num_calls_until_crash: num_calls_until_crash}

_ ->
raise Dotenvy.Error, message: "storage must be one of: MEMORY, FAST_FILE, LEGACY_FILE"
end
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/postgres/xid.ex
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ defmodule Electric.Postgres.Xid do
@type pg_snapshot() :: {anyxid, anyxid, [anyxid]}

@doc """
Check if a transaction is after the end of a snapshot - if it's xid is over xmax
Check if a transaction is in the future from the POV of the snapshot. In other words, if its
xid is >= xmax, its changes are definitely *not* visible and *won't become* visible in this snapshot.
"""
@spec after_snapshot?(anyxid, pg_snapshot()) :: boolean()
def after_snapshot?(xid, {_, xmax, _}) when not is_lt(xid, xmax), do: true
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,58 @@ defmodule Electric.ShapeCache.InMemoryStorage do
opts
end

@impl Electric.ShapeCache.Storage
def append_fragment_to_log!(log_items, %MS{} = opts) do
# For in-memory storage, fragment writes are similar to full transaction writes
# but we do NOT update @latest_offset_key until commit, and we do NOT send
# the :flushed message. This ensures fetch_latest_offset returns the last
# committed transaction offset, not a mid-transaction offset.
log_table = opts.log_table
chunk_checkpoint_table = opts.chunk_checkpoint_table

{processed_log_items, _last_offset} =
Enum.map_reduce(log_items, nil, fn
{:chunk_boundary, offset}, curr ->
{{storage_offset(offset), :checkpoint}, curr}

{offset, _key, _op_type, json_log_item}, _ ->
{{{:offset, storage_offset(offset)}, json_log_item}, offset}
end)

processed_log_items
|> Enum.split_with(fn item -> match?({_, :checkpoint}, item) end)
|> then(fn {checkpoints, log_items} ->
:ets.insert(chunk_checkpoint_table, checkpoints)
:ets.insert(log_table, log_items)
# Note: NOT updating @latest_offset_key until signal_txn_commit!
end)

opts
end

@impl Electric.ShapeCache.Storage
def signal_txn_commit!(_xid, %MS{} = opts) do
# For in-memory storage, the commit signal updates @latest_offset_key
# to the last written offset and sends the :flushed message
log_table = opts.log_table

# Find the last written offset in the log table
case :ets.last(log_table) do
:"$end_of_table" ->
opts

{:offset, offset_tuple} ->
last_offset = LogOffset.new(offset_tuple)
:ets.insert(opts.snapshot_table, {@latest_offset_key, last_offset})
send(self(), {Storage, :flushed, last_offset})
opts

_ ->
# Other keys (like :movein) - no update needed
opts
end
end

@impl Electric.ShapeCache.Storage
def write_move_in_snapshot!(stream, name, %MS{log_table: log_table}) do
stream
Expand Down
56 changes: 46 additions & 10 deletions packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ defmodule Electric.ShapeCache.PureFileStorage do
end

defp shape_data_dir(%__MODULE__{stack_id: stack_id, shape_handle: shape_handle}, suffix) do
{__MODULE__, stack_opts} = Storage.for_stack(stack_id)
shape_data_dir(stack_opts.base_path, shape_handle, suffix)
base_path = Storage.opt_for_stack(stack_id, :base_path)
shape_data_dir(base_path, shape_handle, suffix)
end

defp shape_data_dir(base_path, shape_handle, suffix \\ [])
Expand All @@ -167,10 +167,7 @@ defmodule Electric.ShapeCache.PureFileStorage do
defp shape_snapshot_dir(opts), do: shape_data_dir(opts, [@snapshot_dir])
defp shape_snapshot_path(opts, filename), do: shape_data_dir(opts, [@snapshot_dir, filename])

defp tmp_dir(%__MODULE__{} = opts) do
{__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id)
stack_opts.tmp_dir
end
defp tmp_dir(%__MODULE__{} = opts), do: Storage.opt_for_stack(opts.stack_id, :tmp_dir)

def stack_start_link(opts) do
Supervisor.start_link(
Expand Down Expand Up @@ -537,8 +534,9 @@ defmodule Electric.ShapeCache.PureFileStorage do
)

if shape_definition.storage.compaction == :enabled do
{__MODULE__, stack_opts} = Storage.for_stack(shape_opts.stack_id)
schedule_compaction(stack_opts.compaction_config)
shape_opts.stack_id
|> Storage.opt_for_stack(:compaction_config)
|> schedule_compaction()
end

writer_state(
Expand Down Expand Up @@ -1306,6 +1304,44 @@ defmodule Electric.ShapeCache.PureFileStorage do
end
end

@doc """
Append log items from a transaction fragment.

Unlike `append_to_log!/2`, this does NOT advance `last_seen_txn_offset` or
call `register_complete_txn`. Transaction completion should be signaled
separately via `signal_txn_commit!/2`.

This ensures that on crash/recovery, `fetch_latest_offset` returns the
last committed transaction offset, not a mid-transaction offset.
"""
def append_fragment_to_log!(fragment_lines, writer_state(writer_acc: acc) = state) do
fragment_lines
|> normalize_log_stream()
|> WriteLoop.append_fragment_to_log!(acc, state)
|> case do
{acc, cancel_flush_timer: true} ->
timer_ref = writer_state(state, :write_timer)
if not is_nil(timer_ref), do: Process.cancel_timer(timer_ref)
writer_state(state, writer_acc: acc, write_timer: nil)

{acc, schedule_flush: times_flushed} ->
writer_state(state, writer_acc: acc)
|> schedule_flush(times_flushed)
end
end

@doc """
Signal that a transaction has committed.

Updates `last_seen_txn_offset` and persists metadata to mark the transaction
as complete. Should be called after all fragments have been written via
`append_fragment_to_log!/2`.
"""
def signal_txn_commit!(_xid, writer_state(writer_acc: acc) = state) do
acc = WriteLoop.signal_txn_commit(acc, state)
writer_state(state, writer_acc: acc)
end

def update_chunk_boundaries_cache(opts, boundaries) do
:ets.update_element(
opts.stack_ets,
Expand All @@ -1320,13 +1356,13 @@ defmodule Electric.ShapeCache.PureFileStorage do
if WriteLoop.has_flushed_since?(acc, old) or is_nil(timer) do
if not is_nil(timer), do: Process.cancel_timer(timer)

{__MODULE__, stack_opts} = Storage.for_stack(opts.stack_id)
flush_period = Storage.opt_for_stack(opts.stack_id, :flush_period)

ref =
Process.send_after(
self(),
{Storage, {__MODULE__, :perform_scheduled_flush, [WriteLoop.times_flushed(acc)]}},
stack_opts.flush_period
flush_period
)

writer_state(state, write_timer: ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,55 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do
end
end

@doc """
Append log lines from a transaction fragment.

Unlike `append_to_log!/3`, this does NOT advance `last_seen_txn_offset` or
call `register_complete_txn`. Transaction completion should be signaled
separately via `register_complete_txn/2` after the commit is received.

This ensures that on crash/recovery, `fetch_latest_offset` returns the
last committed transaction offset, not a mid-transaction offset.
"""
def append_fragment_to_log!(
fragment_lines,
writer_acc(times_flushed: times_flushed) = acc,
state
) do
acc = ensure_json_file_open(acc, state)

fragment_lines
|> Enum.reduce(acc, fn
{offset, _, _, _, _, _, _}, writer_acc(last_seen_txn_offset: min_offset) = acc
when is_log_offset_lte(offset, min_offset) ->
# Line already persisted, no-op
acc

{offset, _, _, _, _, _, _} = line, acc ->
acc
|> maybe_write_opening_chunk_boundary(state, offset)
|> add_to_buffer(line)
|> maybe_write_closing_chunk_boundary(state)
|> maybe_flush_buffer(state)
end)
|> case do
# If the buffer has been fully flushed, no need to schedule more flushes
# Note: We do NOT update last_seen_txn_offset or call register_complete_txn
# since the transaction is not yet complete
writer_acc(buffer_size: 0) = acc ->
acc = close_chunk_file(acc)
{acc, cancel_flush_timer: true}

acc ->
acc =
acc
|> store_lines_in_ets(state)
|> close_chunk_file()

{acc, schedule_flush: times_flushed}
end
end

### Working with the buffer

defp add_to_buffer(
Expand Down Expand Up @@ -374,4 +423,15 @@ defmodule Electric.ShapeCache.PureFileStorage.WriteLoop do
end
|> update_persistance_metadata(state, prev_persisted_txn)
end

@doc """
Signal that a transaction has been committed.

This updates `last_seen_txn_offset` and potentially `last_persisted_txn_offset`
to mark the transaction as complete. Should be called after all fragments
have been written via `append_fragment_to_log!/3`.
"""
def signal_txn_commit(acc, state) do
register_complete_txn(acc, state)
end
end
Loading
Loading