fix: Handle replication connection loss (PGRES_COPY_BOTH) and reduce keepalive overhead#749
Open
ksohail22 wants to merge 8 commits intoMeltanoLabs:mainfrom
Open
Conversation
Fix: LOG_BASED replication bookmark not advancing between syncs
Fix InvalidStreamSortException in LOG_BASED replication
…LOG_BASED streams
Fix: Unbounded WAL retention and premature replication loop exit for LOG_BASED streams
…keepalive overhead
PGRES_COPY_BOTH) and reduce keepalive overhead
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Two issues observed in production after the long-lived replication loop was deployed:
1.
PGRES_COPY_BOTHcrashes terminate the entire syncWhen the PostgreSQL server terminates the WAL sender mid-session (due to
wal_sender_timeout, network interruption, RDS failover, or server-side load),read_message()raises:This unhandled exception crashes the tap, losing all progress for the current stream and every subsequent stream in the job. The replication slot is not advanced, so the next run must re-scan the same WAL.
2. Excessive keepalive overhead
status_intervalwas set to 10 seconds, causing the client to send standby status feedback 6 times per minute. This is unnecessary overhead — PostgreSQL's defaultwal_sender_timeoutis 60 seconds, so feedback every 30 seconds (2/min) is sufficient to keep the connection alive.Solution
Connection-loss resilience
The
read_message()call is now wrapped intry/except psycopg2.DatabaseError. When the replication connection dies:connection_lostflag._advance_slot_via_new_connection()opens a fresh replication connection, queriespg_current_wal_flush_lsn()for the current WAL tip, sendssend_feedbackto advance the slot, and updates the stream bookmark.close()calls are wrapped intry/exceptto prevent secondary exceptions.This ensures that even when a connection is lost:
Reduced keepalive frequency
status_intervalincreased from10to30seconds. This halves the feedback message rate while maintaining a comfortable 30-second margin below the default 60-secondwal_sender_timeout.Changes
tap_postgres/client.py—PostgresLogBasedStream:get_records():status_intervalraised from 10 → 30read_message()wrapped intry/except psycopg2.DatabaseErrorconnection_lostflag controls which slot-advancement path is takenselect.selectnow also catchesOSError(broken socket)try/except_advance_slot_via_new_connection()— New method. Recovery path when the original replication connection is dead. Opens a fresh replication session at the WAL tip, sends feedback to advance the slot, and updates the bookmark.Error handling flow
Risk assessment
PGRES_COPY_BOTHerror now results in a partial sync (records before the crash are kept) rather than a full job failure. The slot is still advanced. This is strictly better than the previous behavior.status_intervalat 30s: Still well within the defaultwal_sender_timeoutof 60 seconds. For servers with a custom lower timeout, the interval can be further adjusted in a future config option.Test plan
pg_terminate_backendon the WAL sender PID during sync) — verify the tap logs a warning, advances the slot via recovery, and the job does not crashstatus_interval=30keeps the connection alive under normal operation (nowal_sender_timeouterrors)