Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ENVIRONMENT.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Environment Configuration Settings
- **ENABLE_WAL_PATH_COMPAT**: old Spilo images were generating wal path in the backup store using the following template ``/spilo/{WAL_BUCKET_SCOPE_PREFIX}{SCOPE}{WAL_BUCKET_SCOPE_SUFFIX}/wal/``, while new images adding one additional directory (``{PGVERSION}``) to the end. In order to avoid (unlikely) issues with restoring WALs (from S3/GC/and so on) when switching to ``spilo-13`` please set the ``ENABLE_WAL_PATH_COMPAT=true`` when deploying old cluster with ``spilo-13`` for the first time. After that the environment variable could be removed. Change of the WAL path also mean that backups stored in the old location will not be cleaned up automatically.
- **WALG_DISABLE_S3_SSE** or **WALE_DISABLE_S3_SSE**: by default wal-g is configured to encrypt files uploaded to S3. In order to disable it you can set this environment variable to ``true``.
- **USE_OLD_LOCALES**: whether to use old locales from Ubuntu 18.04 in the Ubuntu 22.04-based image. Default is false.
- **USE_APPLICATION_NAME_IN_UPGRADE**: whether to use the application name in the upgrade script. Default is false. Usable for usage with service meshs.

wal-g
-----
Expand Down
31 changes: 23 additions & 8 deletions postgres-appliance/major_upgrade/inplace_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,17 @@ def ensure_replicas_state(self, cluster):
to all of them and puts into the `self.replica_connections` dict for a future usage.
"""
self.replica_connections = {}
streaming = {a: l for a, l in self.postgresql.query(
("SELECT client_addr, pg_catalog.pg_{0}_{1}_diff(pg_catalog.pg_current_{0}_{1}(),"
streaming = {(addr, name): lag for addr, name, lag in self.postgresql.query(
("SELECT client_addr, application_name, pg_catalog.pg_{0}_{1}_diff(pg_catalog.pg_current_{0}_{1}(),"
" COALESCE(replay_{1}, '0/0'))::bigint FROM pg_catalog.pg_stat_replication")
.format(self.postgresql.wal_name, self.postgresql.lsn_name))}

def ensure_replica_state(member):
ip = member.conn_kwargs().get('host')
lag = streaming.get(ip)
lag = streaming.get((ip, member.name))
if lag is None and os.getenv('USE_APPLICATION_NAME_IN_UPGRADE') == 'true':
# Try looking up by any IP address matching the member name
lag = next((lag for (_, app_name), lag in streaming.items() if app_name == member.name), None)
if lag is None:
return logger.error('Member %s is not streaming from the primary', member.name)
if lag > 16*1024*1024:
Expand All @@ -197,7 +200,13 @@ def ensure_replica_state(member):
cur.execute('SELECT pg_catalog.pg_is_in_recovery()')
if not cur.fetchone()[0]:
return logger.error('Member %s is not running as replica!', member.name)
self.replica_connections[member.name] = (ip, cur)

# determine the "client_ip" seen from leader
# differs from "ip" when using proxy sidecars (service mesh e.g. istio)
client_ip = next((addr for (addr, app_name), _ in streaming.items()
if app_name == member.name), None)

self.replica_connections[member.name] = (ip, cur, client_ip)
return True

return all(ensure_replica_state(member) for member in cluster.members if member.name != self.postgresql.name)
Expand Down Expand Up @@ -243,7 +252,7 @@ def wait_for_replicas(self, checkpoint_lsn):

for _ in polling_loop(60):
synced = True
for name, (_, cur) in self.replica_connections.items():
for name, (_, cur, _) in self.replica_connections.items():
prev = status.get(name)
if prev and prev >= checkpoint_lsn:
continue
Expand Down Expand Up @@ -276,8 +285,14 @@ def create_rsyncd_configs(self):
secrets_file = os.path.join(self.rsyncd_conf_dir, 'rsyncd.secrets')

auth_users = ','.join(self.replica_connections.keys())
replica_ips = ','.join(str(v[0]) for v in self.replica_connections.values())

# Collect both host IP and the client IP in case of proxy sidecars
replica_ips = {str(v[0]) for v in self.replica_connections.values()} # Connection IPs
replica_ips.update(str(v[2]) for v in self.replica_connections.values() if v[2]) # Streaming IPs

# Filter out None values and join IPs
replica_ips = ','.join(filter(None, replica_ips))

with open(self.rsyncd_conf, 'w') as f:
f.write("""port = {0}
use chroot = false
Expand Down Expand Up @@ -321,7 +336,7 @@ def stop_rsyncd(self):
logger.error('Failed to remove %s: %r', self.rsyncd_conf_dir, e)

def checkpoint(self, member):
name, (_, cur) = member
name, (_, cur, _) = member
try:
cur.execute('CHECKPOINT')
return name, True
Expand All @@ -335,7 +350,7 @@ def rsync_replicas(self, primary_ip):
logger.info('Notifying replicas %s to start rsync', ','.join(self.replica_connections.keys()))
ret = True
status = {}
for name, (ip, cur) in self.replica_connections.items():
for name, (ip, cur, _) in self.replica_connections.items():
try:
cur.execute("SELECT pg_catalog.pg_backend_pid()")
pid = cur.fetchone()[0]
Expand Down
Loading