diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 296a1b915b..a0e76eb476 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -417,11 +417,13 @@ defmodule Electric.Postgres.ReplicationClient do defp acknowledge_transaction(%TransactionFragment{lsn: lsn, commit: commit}, state) do if Sampler.sample_metrics?() do + alias Electric.Replication.Changes.Commit + OpenTelemetry.execute( [:electric, :postgres, :replication, :transaction_received], %{ monotonic_time: System.monotonic_time(), - receive_lag: DateTime.diff(DateTime.utc_now(), commit.commit_timestamp, :millisecond), + receive_lag: Commit.calculate_final_receive_lag(commit, System.monotonic_time()), bytes: commit.transaction_size, count: 1, operations: commit.txn_change_count diff --git a/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex b/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex index 8bd9eb05d7..1249b2be7b 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex @@ -221,10 +221,15 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do end def convert(%LR.Commit{} = msg, %__MODULE__{txn_fragment: fragment} = state) do + now_mono = System.monotonic_time() + initial_lag = Commit.calculate_initial_receive_lag(msg.commit_timestamp, DateTime.utc_now()) + commit = %Commit{ commit_timestamp: msg.commit_timestamp, transaction_size: state.tx_size, - txn_change_count: state.tx_change_count + txn_change_count: state.tx_change_count, + received_at_mono: now_mono, + initial_receive_lag: initial_lag } last_log_offset = state.last_log_offset || LogOffset.new(Lsn.to_integer(fragment.lsn), 0) diff --git a/packages/sync-service/lib/electric/replication/changes.ex b/packages/sync-service/lib/electric/replication/changes.ex index 258437aeac..054abea963 100644 --- a/packages/sync-service/lib/electric/replication/changes.ex +++ b/packages/sync-service/lib/electric/replication/changes.ex @@ -38,10 +38,60 @@ defmodule Electric.Replication.Changes do @type t() :: %__MODULE__{ commit_timestamp: DateTime.t() | nil, transaction_size: non_neg_integer(), - txn_change_count: non_neg_integer() + txn_change_count: non_neg_integer(), + received_at_mono: integer() | nil, + initial_receive_lag: non_neg_integer() | nil } - defstruct [:commit_timestamp, transaction_size: 0, txn_change_count: 0] + defstruct [ + :commit_timestamp, + :received_at_mono, + :initial_receive_lag, + transaction_size: 0, + txn_change_count: 0 + ] + + @doc """ + Calculate the initial receive lag in milliseconds, clamped to >= 0. + + This handles clock skew between Postgres and Electric by ensuring + the lag is never negative even if the commit timestamp appears to + be in the future from Electric's perspective. + + Note: When clocks are skewed such that the commit timestamp is ahead + of Electric's clock, this function clamps the result to 0. This means + information about actual network/replication lag is lost, and the + final receive lag reported by `calculate_final_receive_lag/2` will + only reflect Electric's internal processing time, not the true + end-to-end lag from Postgres commit to Electric receipt. + """ + @spec calculate_initial_receive_lag(DateTime.t(), DateTime.t()) :: non_neg_integer() + def calculate_initial_receive_lag(commit_timestamp, current_time) do + max(0, DateTime.diff(current_time, commit_timestamp, :millisecond)) + end + + @doc """ + Calculate the final receive lag in milliseconds. + + Combines the initial receive lag (captured when the commit was received) + with the time elapsed within Electric (measured using monotonic time). + + Note: If the initial receive lag was clamped to 0 due to clock skew + (see `calculate_initial_receive_lag/2`), the value returned here + represents only Electric's internal processing time, not the true + end-to-end lag from Postgres commit to acknowledgement. + """ + @spec calculate_final_receive_lag(t(), integer()) :: non_neg_integer() + def calculate_final_receive_lag(%__MODULE__{} = commit, current_mono) do + elapsed_in_electric = + System.convert_time_unit( + current_mono - commit.received_at_mono, + :native, + :millisecond + ) + + commit.initial_receive_lag + elapsed_in_electric + end end defmodule TransactionFragment do diff --git a/packages/sync-service/test/electric/replication/changes_test.exs b/packages/sync-service/test/electric/replication/changes_test.exs index 73e3db1f8f..6677b8852b 100644 --- a/packages/sync-service/test/electric/replication/changes_test.exs +++ b/packages/sync-service/test/electric/replication/changes_test.exs @@ -1,12 +1,91 @@ defmodule Electric.Replication.ChangesTest do use ExUnit.Case, async: true + alias Electric.Replication.Changes.Commit alias Electric.Replication.Changes.NewRecord alias Electric.Replication.Changes.UpdatedRecord alias Electric.Replication.Changes.DeletedRecord doctest Electric.Replication.Changes, import: true + describe "Commit.calculate_initial_receive_lag/2" do + test "returns positive lag when commit timestamp is in the past" do + commit_timestamp = ~U[2024-01-01 12:00:00.000Z] + current_time = ~U[2024-01-01 12:00:00.500Z] + + lag = Commit.calculate_initial_receive_lag(commit_timestamp, current_time) + + assert lag == 500 + end + + test "returns zero when commit timestamp equals current time" do + timestamp = ~U[2024-01-01 12:00:00.000Z] + + lag = Commit.calculate_initial_receive_lag(timestamp, timestamp) + + assert lag == 0 + end + + test "clamps to zero when commit timestamp is in the future (clock skew)" do + commit_timestamp = ~U[2024-01-01 12:00:01.000Z] + current_time = ~U[2024-01-01 12:00:00.000Z] + + lag = Commit.calculate_initial_receive_lag(commit_timestamp, current_time) + + assert lag == 0 + end + end + + describe "Commit.calculate_final_receive_lag/2" do + test "returns initial lag plus elapsed time in Electric" do + received_at_mono = System.monotonic_time() + initial_lag = 100 + elapsed_ms = 50 + + commit = %Commit{ + commit_timestamp: ~U[2024-01-01 12:00:00.000Z], + received_at_mono: received_at_mono, + initial_receive_lag: initial_lag + } + + current_mono = + received_at_mono + System.convert_time_unit(elapsed_ms, :millisecond, :native) + + lag = Commit.calculate_final_receive_lag(commit, current_mono) + + assert lag == initial_lag + elapsed_ms + end + + test "returns initial lag when no time has elapsed" do + mono_time = System.monotonic_time() + initial_lag = 250 + + commit = %Commit{ + commit_timestamp: ~U[2024-01-01 12:00:00.000Z], + received_at_mono: mono_time, + initial_receive_lag: initial_lag + } + + lag = Commit.calculate_final_receive_lag(commit, mono_time) + + assert lag == initial_lag + end + + test "never returns negative values even with zero initial lag" do + mono_time = System.monotonic_time() + + commit = %Commit{ + commit_timestamp: ~U[2024-01-01 12:00:00.000Z], + received_at_mono: mono_time, + initial_receive_lag: 0 + } + + lag = Commit.calculate_final_receive_lag(commit, mono_time) + + assert lag >= 0 + end + end + describe "UpdatedRecord.changed_columns" do test "is empty when old_record is nil" do changed_columns = MapSet.new([])