From 7de2f884323946c9b6bd555cb91b63cb7bf7fa67 Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Wed, 28 Jan 2026 16:36:11 +0000 Subject: [PATCH] Prevent negative replication lag statistics Detect clock skew that presents as transaction commit timestamps being after the current system time and use the diff in the final replication lag calculation. As noted this removes information but in the presence of detectable clock skew this information is unhelpful. Clock skew make a mockery of our idea that the replication lag calculation in any way represents the actual time between a commit in pg and a write in electric. With this at least we get an end to end time for the processing of a transaction and can monitor trends. --- .../electric/postgres/replication_client.ex | 4 +- .../replication_client/message_converter.ex | 7 +- .../lib/electric/replication/changes.ex | 54 ++++++++++++- .../electric/replication/changes_test.exs | 79 +++++++++++++++++++ 4 files changed, 140 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 296a1b915b..a0e76eb476 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -417,11 +417,13 @@ defmodule Electric.Postgres.ReplicationClient do defp acknowledge_transaction(%TransactionFragment{lsn: lsn, commit: commit}, state) do if Sampler.sample_metrics?() do + alias Electric.Replication.Changes.Commit + OpenTelemetry.execute( [:electric, :postgres, :replication, :transaction_received], %{ monotonic_time: System.monotonic_time(), - receive_lag: DateTime.diff(DateTime.utc_now(), commit.commit_timestamp, :millisecond), + receive_lag: Commit.calculate_final_receive_lag(commit, System.monotonic_time()), bytes: commit.transaction_size, count: 1, operations: commit.txn_change_count diff --git a/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex b/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex index 8bd9eb05d7..1249b2be7b 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex @@ -221,10 +221,15 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do end def convert(%LR.Commit{} = msg, %__MODULE__{txn_fragment: fragment} = state) do + now_mono = System.monotonic_time() + initial_lag = Commit.calculate_initial_receive_lag(msg.commit_timestamp, DateTime.utc_now()) + commit = %Commit{ commit_timestamp: msg.commit_timestamp, transaction_size: state.tx_size, - txn_change_count: state.tx_change_count + txn_change_count: state.tx_change_count, + received_at_mono: now_mono, + initial_receive_lag: initial_lag } last_log_offset = state.last_log_offset || LogOffset.new(Lsn.to_integer(fragment.lsn), 0) diff --git a/packages/sync-service/lib/electric/replication/changes.ex b/packages/sync-service/lib/electric/replication/changes.ex index 258437aeac..054abea963 100644 --- a/packages/sync-service/lib/electric/replication/changes.ex +++ b/packages/sync-service/lib/electric/replication/changes.ex @@ -38,10 +38,60 @@ defmodule Electric.Replication.Changes do @type t() :: %__MODULE__{ commit_timestamp: DateTime.t() | nil, transaction_size: non_neg_integer(), - txn_change_count: non_neg_integer() + txn_change_count: non_neg_integer(), + received_at_mono: integer() | nil, + initial_receive_lag: non_neg_integer() | nil } - defstruct [:commit_timestamp, transaction_size: 0, txn_change_count: 0] + defstruct [ + :commit_timestamp, + :received_at_mono, + :initial_receive_lag, + transaction_size: 0, + txn_change_count: 0 + ] + + @doc """ + Calculate the initial receive lag in milliseconds, clamped to >= 0. + + This handles clock skew between Postgres and Electric by ensuring + the lag is never negative even if the commit timestamp appears to + be in the future from Electric's perspective. + + Note: When clocks are skewed such that the commit timestamp is ahead + of Electric's clock, this function clamps the result to 0. This means + information about actual network/replication lag is lost, and the + final receive lag reported by `calculate_final_receive_lag/2` will + only reflect Electric's internal processing time, not the true + end-to-end lag from Postgres commit to Electric receipt. + """ + @spec calculate_initial_receive_lag(DateTime.t(), DateTime.t()) :: non_neg_integer() + def calculate_initial_receive_lag(commit_timestamp, current_time) do + max(0, DateTime.diff(current_time, commit_timestamp, :millisecond)) + end + + @doc """ + Calculate the final receive lag in milliseconds. + + Combines the initial receive lag (captured when the commit was received) + with the time elapsed within Electric (measured using monotonic time). + + Note: If the initial receive lag was clamped to 0 due to clock skew + (see `calculate_initial_receive_lag/2`), the value returned here + represents only Electric's internal processing time, not the true + end-to-end lag from Postgres commit to acknowledgement. + """ + @spec calculate_final_receive_lag(t(), integer()) :: non_neg_integer() + def calculate_final_receive_lag(%__MODULE__{} = commit, current_mono) do + elapsed_in_electric = + System.convert_time_unit( + current_mono - commit.received_at_mono, + :native, + :millisecond + ) + + commit.initial_receive_lag + elapsed_in_electric + end end defmodule TransactionFragment do diff --git a/packages/sync-service/test/electric/replication/changes_test.exs b/packages/sync-service/test/electric/replication/changes_test.exs index 73e3db1f8f..6677b8852b 100644 --- a/packages/sync-service/test/electric/replication/changes_test.exs +++ b/packages/sync-service/test/electric/replication/changes_test.exs @@ -1,12 +1,91 @@ defmodule Electric.Replication.ChangesTest do use ExUnit.Case, async: true + alias Electric.Replication.Changes.Commit alias Electric.Replication.Changes.NewRecord alias Electric.Replication.Changes.UpdatedRecord alias Electric.Replication.Changes.DeletedRecord doctest Electric.Replication.Changes, import: true + describe "Commit.calculate_initial_receive_lag/2" do + test "returns positive lag when commit timestamp is in the past" do + commit_timestamp = ~U[2024-01-01 12:00:00.000Z] + current_time = ~U[2024-01-01 12:00:00.500Z] + + lag = Commit.calculate_initial_receive_lag(commit_timestamp, current_time) + + assert lag == 500 + end + + test "returns zero when commit timestamp equals current time" do + timestamp = ~U[2024-01-01 12:00:00.000Z] + + lag = Commit.calculate_initial_receive_lag(timestamp, timestamp) + + assert lag == 0 + end + + test "clamps to zero when commit timestamp is in the future (clock skew)" do + commit_timestamp = ~U[2024-01-01 12:00:01.000Z] + current_time = ~U[2024-01-01 12:00:00.000Z] + + lag = Commit.calculate_initial_receive_lag(commit_timestamp, current_time) + + assert lag == 0 + end + end + + describe "Commit.calculate_final_receive_lag/2" do + test "returns initial lag plus elapsed time in Electric" do + received_at_mono = System.monotonic_time() + initial_lag = 100 + elapsed_ms = 50 + + commit = %Commit{ + commit_timestamp: ~U[2024-01-01 12:00:00.000Z], + received_at_mono: received_at_mono, + initial_receive_lag: initial_lag + } + + current_mono = + received_at_mono + System.convert_time_unit(elapsed_ms, :millisecond, :native) + + lag = Commit.calculate_final_receive_lag(commit, current_mono) + + assert lag == initial_lag + elapsed_ms + end + + test "returns initial lag when no time has elapsed" do + mono_time = System.monotonic_time() + initial_lag = 250 + + commit = %Commit{ + commit_timestamp: ~U[2024-01-01 12:00:00.000Z], + received_at_mono: mono_time, + initial_receive_lag: initial_lag + } + + lag = Commit.calculate_final_receive_lag(commit, mono_time) + + assert lag == initial_lag + end + + test "never returns negative values even with zero initial lag" do + mono_time = System.monotonic_time() + + commit = %Commit{ + commit_timestamp: ~U[2024-01-01 12:00:00.000Z], + received_at_mono: mono_time, + initial_receive_lag: 0 + } + + lag = Commit.calculate_final_receive_lag(commit, mono_time) + + assert lag >= 0 + end + end + describe "UpdatedRecord.changed_columns" do test "is empty when old_record is nil" do changed_columns = MapSet.new([])