Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,13 @@ defmodule Electric.Postgres.ReplicationClient do

defp acknowledge_transaction(%TransactionFragment{lsn: lsn, commit: commit}, state) do
if Sampler.sample_metrics?() do
alias Electric.Replication.Changes.Commit

OpenTelemetry.execute(
[:electric, :postgres, :replication, :transaction_received],
%{
monotonic_time: System.monotonic_time(),
receive_lag: DateTime.diff(DateTime.utc_now(), commit.commit_timestamp, :millisecond),
receive_lag: Commit.calculate_final_receive_lag(commit, System.monotonic_time()),
bytes: commit.transaction_size,
count: 1,
operations: commit.txn_change_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,15 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
end

def convert(%LR.Commit{} = msg, %__MODULE__{txn_fragment: fragment} = state) do
now_mono = System.monotonic_time()
initial_lag = Commit.calculate_initial_receive_lag(msg.commit_timestamp, DateTime.utc_now())

commit = %Commit{
commit_timestamp: msg.commit_timestamp,
transaction_size: state.tx_size,
txn_change_count: state.tx_change_count
txn_change_count: state.tx_change_count,
received_at_mono: now_mono,
initial_receive_lag: initial_lag
}

last_log_offset = state.last_log_offset || LogOffset.new(Lsn.to_integer(fragment.lsn), 0)
Expand Down
54 changes: 52 additions & 2 deletions packages/sync-service/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,60 @@ defmodule Electric.Replication.Changes do
@type t() :: %__MODULE__{
commit_timestamp: DateTime.t() | nil,
transaction_size: non_neg_integer(),
txn_change_count: non_neg_integer()
txn_change_count: non_neg_integer(),
received_at_mono: integer() | nil,
initial_receive_lag: non_neg_integer() | nil
}

defstruct [:commit_timestamp, transaction_size: 0, txn_change_count: 0]
defstruct [
:commit_timestamp,
:received_at_mono,
:initial_receive_lag,
transaction_size: 0,
txn_change_count: 0
]

@doc """
Calculate the initial receive lag in milliseconds, clamped to >= 0.

This handles clock skew between Postgres and Electric by ensuring
the lag is never negative even if the commit timestamp appears to
be in the future from Electric's perspective.

Note: When clocks are skewed such that the commit timestamp is ahead
of Electric's clock, this function clamps the result to 0. This means
information about actual network/replication lag is lost, and the
final receive lag reported by `calculate_final_receive_lag/2` will
only reflect Electric's internal processing time, not the true
end-to-end lag from Postgres commit to Electric receipt.
"""
@spec calculate_initial_receive_lag(DateTime.t(), DateTime.t()) :: non_neg_integer()
def calculate_initial_receive_lag(commit_timestamp, current_time) do
max(0, DateTime.diff(current_time, commit_timestamp, :millisecond))
end

@doc """
Calculate the final receive lag in milliseconds.

Combines the initial receive lag (captured when the commit was received)
with the time elapsed within Electric (measured using monotonic time).

Note: If the initial receive lag was clamped to 0 due to clock skew
(see `calculate_initial_receive_lag/2`), the value returned here
represents only Electric's internal processing time, not the true
end-to-end lag from Postgres commit to acknowledgement.
"""
@spec calculate_final_receive_lag(t(), integer()) :: non_neg_integer()
def calculate_final_receive_lag(%__MODULE__{} = commit, current_mono) do
elapsed_in_electric =
System.convert_time_unit(
current_mono - commit.received_at_mono,
:native,
:millisecond
)

commit.initial_receive_lag + elapsed_in_electric
end
end

defmodule TransactionFragment do
Expand Down
79 changes: 79 additions & 0 deletions packages/sync-service/test/electric/replication/changes_test.exs
Original file line number Diff line number Diff line change
@@ -1,12 +1,91 @@
defmodule Electric.Replication.ChangesTest do
use ExUnit.Case, async: true

alias Electric.Replication.Changes.Commit
alias Electric.Replication.Changes.NewRecord
alias Electric.Replication.Changes.UpdatedRecord
alias Electric.Replication.Changes.DeletedRecord

doctest Electric.Replication.Changes, import: true

describe "Commit.calculate_initial_receive_lag/2" do
test "returns positive lag when commit timestamp is in the past" do
commit_timestamp = ~U[2024-01-01 12:00:00.000Z]
current_time = ~U[2024-01-01 12:00:00.500Z]

lag = Commit.calculate_initial_receive_lag(commit_timestamp, current_time)

assert lag == 500
end

test "returns zero when commit timestamp equals current time" do
timestamp = ~U[2024-01-01 12:00:00.000Z]

lag = Commit.calculate_initial_receive_lag(timestamp, timestamp)

assert lag == 0
end

test "clamps to zero when commit timestamp is in the future (clock skew)" do
commit_timestamp = ~U[2024-01-01 12:00:01.000Z]
current_time = ~U[2024-01-01 12:00:00.000Z]

lag = Commit.calculate_initial_receive_lag(commit_timestamp, current_time)

assert lag == 0
end
end

describe "Commit.calculate_final_receive_lag/2" do
test "returns initial lag plus elapsed time in Electric" do
received_at_mono = System.monotonic_time()
initial_lag = 100
elapsed_ms = 50

commit = %Commit{
commit_timestamp: ~U[2024-01-01 12:00:00.000Z],
received_at_mono: received_at_mono,
initial_receive_lag: initial_lag
}

current_mono =
received_at_mono + System.convert_time_unit(elapsed_ms, :millisecond, :native)

lag = Commit.calculate_final_receive_lag(commit, current_mono)

assert lag == initial_lag + elapsed_ms
end

test "returns initial lag when no time has elapsed" do
mono_time = System.monotonic_time()
initial_lag = 250

commit = %Commit{
commit_timestamp: ~U[2024-01-01 12:00:00.000Z],
received_at_mono: mono_time,
initial_receive_lag: initial_lag
}

lag = Commit.calculate_final_receive_lag(commit, mono_time)

assert lag == initial_lag
end

test "never returns negative values even with zero initial lag" do
mono_time = System.monotonic_time()

commit = %Commit{
commit_timestamp: ~U[2024-01-01 12:00:00.000Z],
received_at_mono: mono_time,
initial_receive_lag: 0
}

lag = Commit.calculate_final_receive_lag(commit, mono_time)

assert lag >= 0
end
end

describe "UpdatedRecord.changed_columns" do
test "is empty when old_record is nil" do
changed_columns = MapSet.new([])
Expand Down