Skip to content
Open
6 changes: 6 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ plugins:
- name: ssl_client_private_key
kind: password
sensitive: true
- name: replication_max_run_seconds
kind: integer
description: Max seconds the LOG_BASED replication loop will run before exiting (default 600).
- name: replication_idle_exit_seconds
kind: integer
description: Exit LOG_BASED replication if no data messages arrive for this many seconds (default 60).
config:
sqlalchemy_url: postgresql://postgres:postgres@localhost:5432/postgres
select:
Expand Down
255 changes: 225 additions & 30 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,24 +307,45 @@ def _increment_stream_state(

@override
def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]:
"""Return a generator of row-type dictionary objects."""
status_interval = 5 # if no records in 5 seconds the tap can exit
"""Return a generator of row-type dictionary objects.

Runs a long-lived replication session (up to
``replication_max_run_seconds``, default 600 s) so the tap can drain
large WAL backlogs in a single sync. Sends periodic flush feedback
while yielding records so the slot releases retained WAL incrementally.

Uses the server's ``wal_end`` keepalive signal to detect when the
server has finished scanning WAL for this table, allowing fast exit
(typically 5-35 s) instead of waiting the full idle timeout. The
``replication_idle_exit_seconds`` (default 60 s) acts as a safety net
when keepalive detection is unavailable.
"""
status_interval = 30
max_run_seconds = int(
self.config.get("replication_max_run_seconds", 600),
)
idle_exit_seconds = int(
self.config.get("replication_idle_exit_seconds", 60),
)
fast_exit_data_seconds = 5
feedback_interval = 30

start_lsn = self.get_starting_replication_key_value(context=context)
if start_lsn is None:
start_lsn = 0

logical_replication_connection = self.logical_replication_connection()
logical_replication_cursor = logical_replication_connection.cursor()

# Flush logs from the previous sync. send_feedback() will only flush LSNs before
# the value of flush_lsn, not including the value of flush_lsn, so this is safe
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

# get the slot name from the configuration or use the default value
replication_slot_name = self.config.get("replication_slot_name", "tappostgres")
replication_slot_name = self.config.get(
"replication_slot_name",
"tappostgres",
)

logical_replication_cursor.start_replication(
slot_name=replication_slot_name, # use slot name
slot_name=replication_slot_name,
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
Expand All @@ -335,34 +356,208 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]:
},
)

# Using scaffolding layout from:
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor
run_start = datetime.datetime.now()
last_data_message = run_start
last_feedback_time = run_start
records_yielded = 0
connection_lost = False

prev_wal_end = 0
wal_end_seen = False
last_wal_end_change = run_start

while True:
message = logical_replication_cursor.read_message()
now = datetime.datetime.now()
elapsed = (now - run_start).total_seconds()
if elapsed > max_run_seconds:
self.logger.info(
"Reached max run time of %d seconds (%d records yielded)",
max_run_seconds,
records_yielded,
)
break

try:
message = logical_replication_cursor.read_message()
except psycopg2.DatabaseError as exc:
self.logger.warning(
"Replication connection lost after %d records in %.0f seconds: %s",
records_yielded,
elapsed,
exc,
)
connection_lost = True
break

current_wal_end = getattr(logical_replication_cursor, "wal_end", 0) or 0
if current_wal_end != prev_wal_end:
if current_wal_end > 0:
wal_end_seen = True
last_wal_end_change = datetime.datetime.now()
prev_wal_end = current_wal_end

if message:
last_data_message = datetime.datetime.now()
row = self.consume(message, logical_replication_cursor)
if row:
records_yielded += 1
yield row
else:
timeout = (
status_interval
- (
datetime.datetime.now() - logical_replication_cursor.feedback_timestamp
).total_seconds()
)
try:
# If the timeout has passed and the cursor still has no new
# messages, the sync has completed.
if (
select.select([logical_replication_cursor], [], [], max(0, timeout))[0]
== []
):
break
except InterruptedError:
pass

logical_replication_cursor.close()
logical_replication_connection.close()
datetime.datetime.now() - last_feedback_time
).total_seconds() >= feedback_interval:
try:
logical_replication_cursor.send_feedback(
flush_lsn=message.data_start,
)
last_feedback_time = datetime.datetime.now()
except Exception:
pass
continue

try:
ready = select.select(
[logical_replication_cursor],
[],
[],
1.0,
)[0]
except (InterruptedError, OSError):
ready = True

if not ready:
now = datetime.datetime.now()
data_idle = (now - last_data_message).total_seconds()
wal_end_stable_for = (now - last_wal_end_change).total_seconds()

if (
wal_end_seen
and wal_end_stable_for >= fast_exit_data_seconds
and data_idle >= fast_exit_data_seconds
):
self.logger.info(
"Server caught up (wal_end stable for %.0fs, no data "
"for %.0fs), ending sync (%d records yielded in "
"%.0f seconds)",
wal_end_stable_for,
data_idle,
records_yielded,
elapsed,
)
break

if data_idle >= idle_exit_seconds:
self.logger.info(
"No data messages for %.0f seconds, ending sync "
"(%d records yielded in %.0f seconds)",
data_idle,
records_yielded,
elapsed,
)
break

if not connection_lost:
self._advance_slot_and_state(
logical_replication_cursor,
start_lsn,
context,
)
else:
self.logger.info(
"Skipping slot advancement after connection loss to avoid "
"skipping unprocessed records. Bookmark stays at the last "
"yielded record; next run will resume from there. "
"(%d records yielded before disconnect)",
records_yielded,
)

try:
logical_replication_cursor.close()
logical_replication_connection.close()
except Exception:
pass

def _advance_slot_and_state(
self,
replication_cursor,
start_lsn: int,
context: Context | None,
) -> None:
"""Advance the replication slot and bookmark to the current WAL tip.

When ``add-tables`` filters out most WAL records, the slot's confirmed
flush position can fall far behind the actual WAL tip, causing
PostgreSQL to retain gigabytes of WAL that will never be consumed.

This method queries the server for its current WAL flush position on a
separate (regular) connection and, if it is ahead of ``start_lsn``:

1. Sends ``send_feedback`` on the replication cursor so the slot can
release retained WAL.
2. Updates ``replication_key_value`` in the stream state so the next
sync resumes from the advanced position rather than re-scanning the
same WAL segment.

Records between ``start_lsn`` and the new position for *other* tables
are irrelevant (filtered by ``add-tables``). Any matching records for
*this* table that fell within the scanned window were already yielded
by ``get_records``; records beyond the scan window will be picked up
from the new, advanced position on the next run.
"""
flush_lsn: int | None = None

# Prefer the wal_end reported by the server during the replication
# session (set from keepalive or data messages).
try:
wal_end = getattr(replication_cursor, "wal_end", 0) or 0
if wal_end > start_lsn:
flush_lsn = wal_end
except Exception:
pass

# Fallback: query the server directly for the current WAL position.
if not flush_lsn or flush_lsn <= start_lsn:
flush_lsn = self._query_current_wal_lsn()

if not flush_lsn or flush_lsn <= start_lsn:
return

try:
replication_cursor.send_feedback(flush_lsn=flush_lsn)
self.logger.info(
"Advanced replication slot confirmed position from %d to %d (delta %.2f MB)",
start_lsn,
flush_lsn,
(flush_lsn - start_lsn) / (1024 * 1024),
)
except Exception as exc:
self.logger.warning("Failed to send final slot feedback: %s", exc)
return

state_dict = self.get_context_state(context)
state_dict["replication_key"] = self.replication_key
state_dict["replication_key_value"] = flush_lsn

def _query_current_wal_lsn(self) -> int | None:
"""Query pg_current_wal_flush_lsn() and return the result as an int."""
try:
conn = psycopg2.connect(
self.connection_parameters.render_as_psycopg2_dsn(),
)
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT pg_current_wal_flush_lsn()")
row = cur.fetchone()
if row is None:
return None
lsn_str = row[0] # e.g. '6/4A3B2C10'
hi, lo = lsn_str.split("/")
return (int(hi, 16) << 32) + int(lo, 16)
finally:
conn.close()
except Exception as exc:
self.logger.warning("Could not query current WAL LSN: %s", exc)
return None

def consume(self, message, cursor) -> dict | None:
"""Ingest WAL message."""
Expand Down
Loading