Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions internal/postgresql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,32 @@ func canPauseReplication(logger *logger.LevelLog, db *Pg) (_ bool, err error) {
return true, err
}

func isReplicationPaused(logger *logger.LevelLog, db *Pg) (_ bool, err error) {
// pg_get_wal_replay_pause_state() was added in PostgreSQL 14 and returns
// 'not paused', 'pause requested', or 'paused'. On older versions,
// pg_is_{wal|xlog}_replay_paused() only returns true when fully paused.
var query string
if db.version >= 140000 {
query = "SELECT pg_get_wal_replay_pause_state() = 'paused' AND pg_is_in_recovery()"
} else {
query = fmt.Sprintf("SELECT pg_is_%s_replay_paused() AND pg_is_in_recovery()", db.xlogOrWal)
}
logger.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query)
if err != nil {
return false, fmt.Errorf("could not check replication pause state: %s", err)
}
defer helpers.WrappedClose(rows, &err)

var paused bool
for rows.Next() {
if err := rows.Scan(&paused); err != nil {
return false, fmt.Errorf("could not get row: %s", err)
}
}
return paused, err
}

func PauseReplicationWithTimeout(logger *logger.LevelLog, db *Pg, timeOut int) error {

if ok, err := canPauseReplication(logger, db); !ok {
Expand Down Expand Up @@ -736,8 +762,15 @@ func PauseReplicationWithTimeout(logger *logger.LevelLog, db *Pg, timeOut int) e
return
}
} else {
done <- true
return
// pg_wal_replay_pause() is asynchronous: the pause may not be
// in effect yet when the function returns. Loop until confirmed.
if paused, err := isReplicationPaused(logger, db); err != nil {
fail <- err
return
} else if paused {
done <- true
return
}
}

select {
Expand Down
Loading