diff --git a/engine/packages/service-manager/src/lib.rs b/engine/packages/service-manager/src/lib.rs index a60854f3e0..0b84fbae82 100644 --- a/engine/packages/service-manager/src/lib.rs +++ b/engine/packages/service-manager/src/lib.rs @@ -393,7 +393,7 @@ pub async fn start( if abort { // Give time for services to handle final abort tokio::time::sleep(Duration::from_millis(50)).await; - rivet_runtime::shutdown().await; // TODO: Fix `JoinHandle polled after completion` error + rivet_runtime::shutdown().await; break; } @@ -401,6 +401,9 @@ pub async fn start( } } + // Shut down udb + pools.udb()?.shutdown().await; + // Stops term signal handler bg task rivet_runtime::TermSignal::stop(); diff --git a/engine/packages/universaldb/src/database.rs b/engine/packages/universaldb/src/database.rs index c2f95983a4..a5c65dd0f5 100644 --- a/engine/packages/universaldb/src/database.rs +++ b/engine/packages/universaldb/src/database.rs @@ -106,4 +106,9 @@ impl Database { pub fn checkpoint(&self, path: &Path) -> Result<()> { self.driver.checkpoint(path) } + + /// Gracefully release process-wide driver resources before shutdown. + pub async fn shutdown(&self) { + self.driver.shutdown().await; + } } diff --git a/engine/packages/universaldb/src/driver/mod.rs b/engine/packages/universaldb/src/driver/mod.rs index 01f7c22b2c..7b19e90fef 100644 --- a/engine/packages/universaldb/src/driver/mod.rs +++ b/engine/packages/universaldb/src/driver/mod.rs @@ -34,6 +34,13 @@ pub trait DatabaseDriver: Send + Sync { fn checkpoint(&self, _path: &Path) -> Result<()> { bail!("checkpoint not supported by this database driver") } + + /// Gracefully release any process-wide resources before shutdown. The Postgres driver hands off + /// its leader lease here so a standby node takes over immediately instead of waiting out the + /// lease TTL. Default is a no-op. + fn shutdown<'a>(&'a self) -> BoxFut<'a, ()> { + Box::pin(async {}) + } } pub trait TransactionDriver: Send + Sync { diff --git a/engine/packages/universaldb/src/driver/postgres/database.rs b/engine/packages/universaldb/src/driver/postgres/database.rs index 6cded0f320..3ff891011f 100644 --- a/engine/packages/universaldb/src/driver/postgres/database.rs +++ b/engine/packages/universaldb/src/driver/postgres/database.rs @@ -290,6 +290,17 @@ impl DatabaseDriver for PostgresDatabaseDriver { self.max_retries.store(limit, Ordering::SeqCst); Ok(()) } + + fn shutdown<'a>(&'a self) -> BoxFut<'a, ()> { + Box::pin(async move { + // Stop renewing the lease before releasing it so a racing renew cannot re-extend it. + self.resolver_handle.abort(); + self.gc_handle.abort(); + + // Hand off leadership immediately if we hold it, instead of waiting out the lease TTL. + resolver::handoff(&self.shared).await; + }) + } } impl Drop for PostgresDatabaseDriver { diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs b/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs index a17b6c7759..93f7f7884e 100644 --- a/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs +++ b/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs @@ -62,6 +62,30 @@ pub async fn renew(pool: &Pool, node_id: &str, epoch: i64) -> Result { Ok(updated == 1) } +/// Gracefully release the lease so a standby node can take over immediately instead of waiting out +/// the TTL. Expires the lease in place, fenced on this node's address so it never clobbers a +/// successor that already took over. Returns `true` if our lease was released (i.e. we were the +/// leader); `false` is the normal no-op when this node is a follower. Renewal must already be +/// stopped before calling this, otherwise a racing renew could re-extend the lease. +pub async fn release(pool: &Pool, node_id: &str) -> Result { + let conn = pool + .get() + .await + .context("failed to get connection for lease release")?; + + let updated = conn + .execute( + "UPDATE udb_lease + SET expires_at = now() + WHERE id = $1 AND leader_addr = $2", + &[&LEASE_ID, &node_id], + ) + .await + .context("failed to release lease")?; + + Ok(updated == 1) +} + /// Read the current durable version (`udb_lease.durable_version`). Used by a freshly elected leader /// to learn the watermark floor it must continue from. pub async fn current_durable_version(pool: &Pool) -> Result { diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs b/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs index 3dfac50fce..f9b6c45461 100644 --- a/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs +++ b/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs @@ -7,10 +7,13 @@ use std::{ }; use anyhow::{Context, Result}; +use tokio::sync::broadcast; use crate::{conflict_tracker::TransactionConflictTracker, transaction::TXN_TIMEOUT}; -use super::shared::{LEASE_ID, LeaseInfo, PostgresShared, WATERMARK_CHANNEL, commit_channel}; +use super::shared::{ + ELECTION_CHANNEL, LEASE_ID, LeaseInfo, PostgresShared, WATERMARK_CHANNEL, commit_channel, +}; /// Max commits resolved+applied per batch (group commit). Amortizes the resolver, Postgres /// round-trips, and fsync across the batch. @@ -40,6 +43,10 @@ pub fn spawn(shared: Arc) -> tokio::task::JoinHandle<()> { } async fn run(shared: Arc) { + // A departing leader NOTIFYs this channel after releasing its lease so we elect immediately + // rather than waiting out the full `ELECTION_RETRY` tick. + let mut election_rx = shared.listener.listen(ELECTION_CHANNEL).await; + loop { match lease::try_acquire(&shared.pool, &shared.node_id).await { Ok(Some(acquired)) => { @@ -50,16 +57,68 @@ async fn run(shared: Arc) { tracing::info!(epoch = acquired.epoch, "stepped down from udb leader"); } Ok(None) => { - tokio::time::sleep(ELECTION_RETRY).await; + wait_for_election_retry(&shared, &mut election_rx).await; } Err(err) => { tracing::warn!(?err, "failed udb lease acquire attempt"); - tokio::time::sleep(ELECTION_RETRY).await; + wait_for_election_retry(&shared, &mut election_rx).await; + } + } + } +} + +/// Wait before retrying the election: either the `ELECTION_RETRY` backstop elapses, or a departing +/// leader wakes us via `ELECTION_CHANNEL` so handoff is near-instant. +async fn wait_for_election_retry( + shared: &Arc, + election_rx: &mut broadcast::Receiver, +) { + tokio::select! { + _ = tokio::time::sleep(ELECTION_RETRY) => {} + res = election_rx.recv() => { + if matches!(res, Err(broadcast::error::RecvError::Closed)) { + // The listener recreates the channel on reconnect; re-subscribe. + *election_rx = shared.listener.listen(ELECTION_CHANNEL).await; } } } } +/// Best-effort graceful leadership handoff invoked on shutdown. If this node currently holds the +/// lease, expire it and wake a standby so it takes over immediately instead of waiting out the TTL. +/// Safe to call on a follower: the fenced release matches no row and nothing is notified. The +/// caller must already have stopped lease renewal before calling this. +pub async fn handoff(shared: &Arc) { + match lease::release(&shared.pool, &shared.node_id).await { + Ok(true) => { + tracing::info!(node_id = %shared.node_id, "released udb leader lease for graceful handoff"); + notify_election(shared).await; + } + Ok(false) => {} + Err(err) => { + tracing::warn!(?err, "failed to release udb lease on shutdown"); + } + } +} + +/// Wake standby candidates so the next election fires immediately after a graceful release. +async fn notify_election(shared: &Arc) { + let conn = match shared.pool.get().await { + Ok(conn) => conn, + Err(err) => { + tracing::debug!(?err, "failed to get connection for election notify"); + return; + } + }; + + if let Err(err) = conn + .execute("SELECT pg_notify($1, '')", &[&ELECTION_CHANNEL]) + .await + { + tracing::debug!(?err, "failed to notify election channel"); + } +} + /// Leader main loop: hold the lease, drain the commit queue on wake or poll, and renew the lease. async fn lead(shared: &Arc, epoch: i64) -> Result<()> { // Publish our own lease into the cache immediately so our local commits route to us. diff --git a/engine/packages/universaldb/src/driver/postgres/shared.rs b/engine/packages/universaldb/src/driver/postgres/shared.rs index 0ab2963158..5b14c4df08 100644 --- a/engine/packages/universaldb/src/driver/postgres/shared.rs +++ b/engine/packages/universaldb/src/driver/postgres/shared.rs @@ -32,6 +32,10 @@ pub fn reply_channel(node_id: &str) -> String { /// Channel the leader NOTIFYs on every watermark advance; all nodes LISTEN. pub const WATERMARK_CHANNEL: &str = "udb_watermark"; +/// Channel a departing leader NOTIFYs after releasing its lease so a standby candidate elects +/// immediately instead of waiting out `ELECTION_RETRY`. All non-leader candidates LISTEN. +pub const ELECTION_CHANNEL: &str = "udb_election"; + /// Cached view of the current leader lease, as seen by a follower. #[derive(Clone, Debug)] pub struct LeaseInfo { diff --git a/engine/packages/universaldb/tests/failover.rs b/engine/packages/universaldb/tests/failover.rs index 7d122ca781..5fb394453c 100644 --- a/engine/packages/universaldb/tests/failover.rs +++ b/engine/packages/universaldb/tests/failover.rs @@ -217,3 +217,73 @@ async fn test_postgres_leader_failover() { drop(db2); } + +/// Exercises graceful leader handoff: a leader that is shut down cleanly (SIGTERM path) releases its +/// lease immediately instead of letting it expire, so a standby takes over well within the lease TTL +/// rather than after it. This is what turns a rolling deploy from a ~TTL commit stall into a +/// near-instant handoff. +#[tokio::test] +async fn test_postgres_graceful_handoff() { + let _ = tracing_subscriber::fmt() + .with_env_filter("info") + .with_test_writer() + .try_init(); + + let (db_config, docker_config) = TestDatabase::Postgres + .config(Uuid::new_v4(), 1) + .await + .unwrap(); + let mut docker_config = docker_config.unwrap(); + docker_config.start().await.unwrap(); + + tokio::time::sleep(Duration::from_secs(4)).await; + + let rivet_config::config::Database::Postgres(postgres_config) = db_config else { + unreachable!(); + }; + let connection_string = postgres_config.url.read().clone(); + + let raw = connect_raw(&connection_string).await; + + // Node 1 wins the first election; node 2 joins as a follower. + let db1 = make_db(&connection_string).await; + let lease1 = wait_for_lease(&raw, Duration::from_secs(15), |l| l.epoch == 1).await; + let leader1_addr = lease1.leader_addr.clone(); + let db2 = make_db(&connection_string).await; + + write_key(&db1, ALPHA_KEY, b"1").await; + let lease_before = read_lease(&raw).await.unwrap(); + + // Gracefully shut down the leader. Unlike a hard drop, this releases the lease in place and + // wakes the standby, so takeover must complete in well under the 10s TTL. + let handoff_start = tokio::time::Instant::now(); + db1.shutdown().await; + + // The lease TTL is 10s; a graceful handoff must take over well under that. The 5s deadline here + // is itself below the TTL, so reaching this line already proves the lease was not waited out. + let lease_after = wait_for_lease(&raw, Duration::from_secs(5), |l| { + l.epoch > lease_before.epoch + }) + .await; + let handoff_elapsed = handoff_start.elapsed(); + assert!( + handoff_elapsed < Duration::from_secs(8), + "graceful handoff must beat the lease TTL (took {handoff_elapsed:?})" + ); + assert_ne!( + lease_after.leader_addr, leader1_addr, + "the standby must become the new leader after a graceful handoff" + ); + + // The new leader serves the old leader's data and accepts fresh commits. + assert_eq!( + read_key(&db2, ALPHA_KEY).await, + Some(b"1".to_vec()), + "committed data must survive graceful handoff" + ); + write_key(&db2, BETA_KEY, b"2").await; + assert_eq!(read_key(&db2, BETA_KEY).await, Some(b"2".to_vec())); + + drop(db1); + drop(db2); +}