Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion engine/packages/service-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,17 @@ pub async fn start(
if abort {
// Give time for services to handle final abort
tokio::time::sleep(Duration::from_millis(50)).await;
rivet_runtime::shutdown().await; // TODO: Fix `JoinHandle polled after completion` error
rivet_runtime::shutdown().await;

break;
}
}
}
}

// Shut down udb
pools.udb()?.shutdown().await;

// Stops term signal handler bg task
rivet_runtime::TermSignal::stop();

Expand Down
5 changes: 5 additions & 0 deletions engine/packages/universaldb/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ impl Database {
pub fn checkpoint(&self, path: &Path) -> Result<()> {
self.driver.checkpoint(path)
}

/// Gracefully release process-wide driver resources before shutdown.
pub async fn shutdown(&self) {
self.driver.shutdown().await;
}
}
7 changes: 7 additions & 0 deletions engine/packages/universaldb/src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub trait DatabaseDriver: Send + Sync {
fn checkpoint(&self, _path: &Path) -> Result<()> {
bail!("checkpoint not supported by this database driver")
}

/// Gracefully release any process-wide resources before shutdown. The Postgres driver hands off
/// its leader lease here so a standby node takes over immediately instead of waiting out the
/// lease TTL. Default is a no-op.
fn shutdown<'a>(&'a self) -> BoxFut<'a, ()> {
Box::pin(async {})
}
}

pub trait TransactionDriver: Send + Sync {
Expand Down
11 changes: 11 additions & 0 deletions engine/packages/universaldb/src/driver/postgres/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@ impl DatabaseDriver for PostgresDatabaseDriver {
self.max_retries.store(limit, Ordering::SeqCst);
Ok(())
}

fn shutdown<'a>(&'a self) -> BoxFut<'a, ()> {
Box::pin(async move {
// Stop renewing the lease before releasing it so a racing renew cannot re-extend it.
self.resolver_handle.abort();
self.gc_handle.abort();

// Hand off leadership immediately if we hold it, instead of waiting out the lease TTL.
resolver::handoff(&self.shared).await;
})
}
}

impl Drop for PostgresDatabaseDriver {
Expand Down
24 changes: 24 additions & 0 deletions engine/packages/universaldb/src/driver/postgres/resolver/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ pub async fn renew(pool: &Pool, node_id: &str, epoch: i64) -> Result<bool> {
Ok(updated == 1)
}

/// Gracefully release the lease so a standby node can take over immediately instead of waiting out
/// the TTL. Expires the lease in place, fenced on this node's address so it never clobbers a
/// successor that already took over. Returns `true` if our lease was released (i.e. we were the
/// leader); `false` is the normal no-op when this node is a follower. Renewal must already be
/// stopped before calling this, otherwise a racing renew could re-extend the lease.
pub async fn release(pool: &Pool, node_id: &str) -> Result<bool> {
let conn = pool
.get()
.await
.context("failed to get connection for lease release")?;

let updated = conn
.execute(
"UPDATE udb_lease
SET expires_at = now()
WHERE id = $1 AND leader_addr = $2",
&[&LEASE_ID, &node_id],
)
.await
.context("failed to release lease")?;

Ok(updated == 1)
}

/// Read the current durable version (`udb_lease.durable_version`). Used by a freshly elected leader
/// to learn the watermark floor it must continue from.
pub async fn current_durable_version(pool: &Pool) -> Result<i64> {
Expand Down
65 changes: 62 additions & 3 deletions engine/packages/universaldb/src/driver/postgres/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use std::{
};

use anyhow::{Context, Result};
use tokio::sync::broadcast;

use crate::{conflict_tracker::TransactionConflictTracker, transaction::TXN_TIMEOUT};

use super::shared::{LEASE_ID, LeaseInfo, PostgresShared, WATERMARK_CHANNEL, commit_channel};
use super::shared::{
ELECTION_CHANNEL, LEASE_ID, LeaseInfo, PostgresShared, WATERMARK_CHANNEL, commit_channel,
};

/// Max commits resolved+applied per batch (group commit). Amortizes the resolver, Postgres
/// round-trips, and fsync across the batch.
Expand Down Expand Up @@ -40,6 +43,10 @@ pub fn spawn(shared: Arc<PostgresShared>) -> tokio::task::JoinHandle<()> {
}

async fn run(shared: Arc<PostgresShared>) {
// A departing leader NOTIFYs this channel after releasing its lease so we elect immediately
// rather than waiting out the full `ELECTION_RETRY` tick.
let mut election_rx = shared.listener.listen(ELECTION_CHANNEL).await;

loop {
match lease::try_acquire(&shared.pool, &shared.node_id).await {
Ok(Some(acquired)) => {
Expand All @@ -50,16 +57,68 @@ async fn run(shared: Arc<PostgresShared>) {
tracing::info!(epoch = acquired.epoch, "stepped down from udb leader");
}
Ok(None) => {
tokio::time::sleep(ELECTION_RETRY).await;
wait_for_election_retry(&shared, &mut election_rx).await;
}
Err(err) => {
tracing::warn!(?err, "failed udb lease acquire attempt");
tokio::time::sleep(ELECTION_RETRY).await;
wait_for_election_retry(&shared, &mut election_rx).await;
}
}
}
}

/// Wait before retrying the election: either the `ELECTION_RETRY` backstop elapses, or a departing
/// leader wakes us via `ELECTION_CHANNEL` so handoff is near-instant.
async fn wait_for_election_retry(
shared: &Arc<PostgresShared>,
election_rx: &mut broadcast::Receiver<String>,
) {
tokio::select! {
_ = tokio::time::sleep(ELECTION_RETRY) => {}
res = election_rx.recv() => {
if matches!(res, Err(broadcast::error::RecvError::Closed)) {
// The listener recreates the channel on reconnect; re-subscribe.
*election_rx = shared.listener.listen(ELECTION_CHANNEL).await;
}
}
}
}

/// Best-effort graceful leadership handoff invoked on shutdown. If this node currently holds the
/// lease, expire it and wake a standby so it takes over immediately instead of waiting out the TTL.
/// Safe to call on a follower: the fenced release matches no row and nothing is notified. The
/// caller must already have stopped lease renewal before calling this.
pub async fn handoff(shared: &Arc<PostgresShared>) {
match lease::release(&shared.pool, &shared.node_id).await {
Ok(true) => {
tracing::info!(node_id = %shared.node_id, "released udb leader lease for graceful handoff");
notify_election(shared).await;
}
Ok(false) => {}
Err(err) => {
tracing::warn!(?err, "failed to release udb lease on shutdown");
}
}
}

/// Wake standby candidates so the next election fires immediately after a graceful release.
async fn notify_election(shared: &Arc<PostgresShared>) {
let conn = match shared.pool.get().await {
Ok(conn) => conn,
Err(err) => {
tracing::debug!(?err, "failed to get connection for election notify");
return;
}
};

if let Err(err) = conn
.execute("SELECT pg_notify($1, '')", &[&ELECTION_CHANNEL])
.await
{
tracing::debug!(?err, "failed to notify election channel");
}
}

/// Leader main loop: hold the lease, drain the commit queue on wake or poll, and renew the lease.
async fn lead(shared: &Arc<PostgresShared>, epoch: i64) -> Result<()> {
// Publish our own lease into the cache immediately so our local commits route to us.
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/universaldb/src/driver/postgres/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub fn reply_channel(node_id: &str) -> String {
/// Channel the leader NOTIFYs on every watermark advance; all nodes LISTEN.
pub const WATERMARK_CHANNEL: &str = "udb_watermark";

/// Channel a departing leader NOTIFYs after releasing its lease so a standby candidate elects
/// immediately instead of waiting out `ELECTION_RETRY`. All non-leader candidates LISTEN.
pub const ELECTION_CHANNEL: &str = "udb_election";

/// Cached view of the current leader lease, as seen by a follower.
#[derive(Clone, Debug)]
pub struct LeaseInfo {
Expand Down
70 changes: 70 additions & 0 deletions engine/packages/universaldb/tests/failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,73 @@ async fn test_postgres_leader_failover() {

drop(db2);
}

/// Exercises graceful leader handoff: a leader that is shut down cleanly (SIGTERM path) releases its
/// lease immediately instead of letting it expire, so a standby takes over well within the lease TTL
/// rather than after it. This is what turns a rolling deploy from a ~TTL commit stall into a
/// near-instant handoff.
#[tokio::test]
async fn test_postgres_graceful_handoff() {
let _ = tracing_subscriber::fmt()
.with_env_filter("info")
.with_test_writer()
.try_init();

let (db_config, docker_config) = TestDatabase::Postgres
.config(Uuid::new_v4(), 1)
.await
.unwrap();
let mut docker_config = docker_config.unwrap();
docker_config.start().await.unwrap();

tokio::time::sleep(Duration::from_secs(4)).await;

let rivet_config::config::Database::Postgres(postgres_config) = db_config else {
unreachable!();
};
let connection_string = postgres_config.url.read().clone();

let raw = connect_raw(&connection_string).await;

// Node 1 wins the first election; node 2 joins as a follower.
let db1 = make_db(&connection_string).await;
let lease1 = wait_for_lease(&raw, Duration::from_secs(15), |l| l.epoch == 1).await;
let leader1_addr = lease1.leader_addr.clone();
let db2 = make_db(&connection_string).await;

write_key(&db1, ALPHA_KEY, b"1").await;
let lease_before = read_lease(&raw).await.unwrap();

// Gracefully shut down the leader. Unlike a hard drop, this releases the lease in place and
// wakes the standby, so takeover must complete in well under the 10s TTL.
let handoff_start = tokio::time::Instant::now();
db1.shutdown().await;

// The lease TTL is 10s; a graceful handoff must take over well under that. The 5s deadline here
// is itself below the TTL, so reaching this line already proves the lease was not waited out.
let lease_after = wait_for_lease(&raw, Duration::from_secs(5), |l| {
l.epoch > lease_before.epoch
})
.await;
let handoff_elapsed = handoff_start.elapsed();
assert!(
handoff_elapsed < Duration::from_secs(8),
"graceful handoff must beat the lease TTL (took {handoff_elapsed:?})"
);
assert_ne!(
lease_after.leader_addr, leader1_addr,
"the standby must become the new leader after a graceful handoff"
);

// The new leader serves the old leader's data and accepts fresh commits.
assert_eq!(
read_key(&db2, ALPHA_KEY).await,
Some(b"1".to_vec()),
"committed data must survive graceful handoff"
);
write_key(&db2, BETA_KEY, b"2").await;
assert_eq!(read_key(&db2, BETA_KEY).await, Some(b"2".to_vec()));

drop(db1);
drop(db2);
}
Loading