From 234bfb3dcde1fe1473b88c1bdeed2fe3ba2d3a2c Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 25 Jun 2026 16:04:12 -0700 Subject: [PATCH] [SLOP(claude-opus-4-8-high)] perf(universaldb): batch leader apply and fold follower commit round-trips --- Cargo.lock | 2 + engine/packages/universaldb/Cargo.toml | 2 +- .../universaldb/src/driver/postgres/commit.rs | 206 +++++++-- .../src/driver/postgres/resolver/apply.rs | 239 +++++++--- .../driver/postgres/resolver/apply_tests.rs | 299 +++++++++++++ .../src/driver/postgres/resolver/mod.rs | 411 +++++++++++++----- .../universaldb/src/driver/postgres/shared.rs | 15 + .../compose/template/src/docker-compose.ts | 1 + self-host/dev-multinode/docker-compose.yml | 3 + 9 files changed, 958 insertions(+), 220 deletions(-) create mode 100644 engine/packages/universaldb/src/driver/postgres/resolver/apply_tests.rs diff --git a/Cargo.lock b/Cargo.lock index f2f1a190d1..07798092d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7762,6 +7762,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.15.4", "pin-project-lite", "tokio", ] diff --git a/engine/packages/universaldb/Cargo.toml b/engine/packages/universaldb/Cargo.toml index 57dba4ab33..6781c7cde1 100644 --- a/engine/packages/universaldb/Cargo.toml +++ b/engine/packages/universaldb/Cargo.toml @@ -27,7 +27,7 @@ tempfile.workspace = true thiserror.workspace = true tokio-postgres-rustls.workspace = true tokio-postgres.workspace = true -tokio-util.workspace = true +tokio-util = { workspace = true, features = ["rt"] } tokio.workspace = true tracing.workspace = true url.workspace = true diff --git a/engine/packages/universaldb/src/driver/postgres/commit.rs b/engine/packages/universaldb/src/driver/postgres/commit.rs index e61714dd0e..d191c815a3 100644 --- a/engine/packages/universaldb/src/driver/postgres/commit.rs +++ b/engine/packages/universaldb/src/driver/postgres/commit.rs @@ -52,36 +52,43 @@ pub async fn submit( .await .context("failed to get connection for commit submit")?; + // Enqueue the request and wake the leader's drain loop in one round-trip. The autocommit + // statement durably inserts the row and fires the NOTIFY together, so there is no separate + // notify round-trip or second pool acquire. let id: i64 = conn .query_one( - "INSERT INTO udb_commit_requests (epoch, read_version, payload, reply_channel) - VALUES ($1, $2, $3, $4) - RETURNING id", - &[&lease.epoch, &read_version, &payload, &reply_channel], + "WITH ins AS ( + INSERT INTO udb_commit_requests (epoch, read_version, payload, reply_channel) + VALUES ($1, $2, $3, $4) + RETURNING id + ) + SELECT pg_notify($5, id::text), id FROM ins", + &[ + &lease.epoch, + &read_version, + &payload, + &reply_channel, + &commit_channel(&lease.leader_addr), + ], ) .await - .context("failed to enqueue commit request")? - .get(0); - - // Wake the leader's drain loop. - if let Err(err) = conn - .execute( - "SELECT pg_notify($1, $2)", - &[&commit_channel(&lease.leader_addr), &id.to_string()], - ) - .await - { - tracing::debug!( - ?err, - "failed to notify leader; relying on its poll backstop" - ); - } + .context("failed to enqueue and notify commit request")? + .get(1); // Release the connection before waiting so a long wait does not pin a pool slot. The request // row is durable, so await_result re-acquires a connection per poll. drop(conn); - await_result(shared, id, lease.epoch, &mut reply_rx).await + let submit_start = Instant::now(); + let result = await_result(shared, id, lease.epoch, &mut reply_rx).await; + tracing::debug!( + id, + epoch = lease.epoch, + wait_ms = submit_start.elapsed().as_millis() as u64, + ok = result.is_ok(), + "udb commit submit completed" + ); + result } /// Wait for a known leader, returning a retryable error if none is elected in time. @@ -98,53 +105,166 @@ async fn wait_for_leader(shared: &Arc) -> Result { } } -/// Poll the request row until it reaches a terminal status, woken by reply NOTIFYs with a polling -/// backstop. Bails as retryable if the leader epoch advances (our request is now orphaned and will -/// never be applied, so it is definitively not committed). +/// Wait for the commit result, resolved directly from the leader's reply NOTIFY payload on the happy +/// path. A polling `read_status` backstop covers a missed/lagged NOTIFY, and an epoch advance orphans +/// the request (it will never be applied, so it is definitively not committed). async fn await_result( shared: &Arc, id: i64, submit_epoch: i64, reply_rx: &mut tokio::sync::broadcast::Receiver, ) -> Result<()> { + let start = Instant::now(); + // Diagnostics: count how the waiter is driven so we can tell whether the reply NOTIFY is doing + // its job or whether commits are riding the slow poll backstop / getting orphaned by failover. + let mut status_reads = 0u32; + let mut notify_wakes = 0u32; + let mut poll_wakes = 0u32; + + // The backstop runs on a fixed-cadence interval, not a per-iteration sleep: under a flood of + // other commits' replies on this node's shared reply channel (which we skip past), a fresh + // per-iteration sleep would keep resetting and never fire, starving the backstop if our own + // NOTIFY was lost. An interval ticks on wall-clock cadence regardless of loop churn. + let mut poll_interval = tokio::time::interval(RESULT_POLL_INTERVAL); + poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Consume the immediate first tick so the first backstop is one interval out, after the reply + // has had a chance to arrive. + poll_interval.tick().await; + loop { - // Re-acquire a connection per poll: the request row is durable, so a transient pool/query - // error just means we retry the poll rather than failing a possibly-applied commit. + // Wait for our reply NOTIFY, falling back to a status read on a poll tick or a lagged + // broadcast. The happy path resolves straight from the payload with no status SELECT. + tokio::select! { + res = reply_rx.recv() => { + match res { + Ok(payload) => { + notify_wakes += 1; + match parse_reply(&payload, id) { + Some(ReplyOutcome::Committed) => { + tracing::debug!( + id, + wait_ms = start.elapsed().as_millis() as u64, + status_reads, + notify_wakes, + poll_wakes, + "udb commit resolved: committed" + ); + return Ok(()); + } + Some(ReplyOutcome::Conflict) => { + tracing::debug!( + id, + wait_ms = start.elapsed().as_millis() as u64, + status_reads, + notify_wakes, + poll_wakes, + "udb commit resolved: conflict" + ); + return Err(DatabaseError::NotCommitted.into()); + } + // Reply for another waiter on the shared channel; keep waiting for ours. + None => continue, + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + // We may have missed our own reply; fall through to the status backstop. + notify_wakes += 1; + tracing::debug!(id, lagged = n, "udb reply broadcast lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::warn!(id, "udb reply broadcast closed; re-subscribing"); + *reply_rx = shared + .listener + .listen(&reply_channel(&shared.node_id)) + .await; + continue; + } + } + } + _ = poll_interval.tick() => { + poll_wakes += 1; + } + } + + // Backstop path (poll tick or lagged broadcast): re-acquire a connection and read the durable + // status. A transient pool/query error just means we retry rather than failing a + // possibly-applied commit. + status_reads += 1; match read_status(shared, id).await { - Ok(Some(Status::Committed)) => return Ok(()), - Ok(Some(Status::Conflict)) => return Err(DatabaseError::NotCommitted.into()), + Ok(Some(Status::Committed)) => { + tracing::debug!( + id, + wait_ms = start.elapsed().as_millis() as u64, + status_reads, + notify_wakes, + poll_wakes, + "udb commit resolved: committed (backstop)" + ); + return Ok(()); + } + Ok(Some(Status::Conflict)) => { + tracing::debug!( + id, + wait_ms = start.elapsed().as_millis() as u64, + status_reads, + notify_wakes, + poll_wakes, + "udb commit resolved: conflict (backstop)" + ); + return Err(DatabaseError::NotCommitted.into()); + } Ok(Some(Status::Pending)) => {} Ok(None) => { // The row was GC'd before we observed a terminal status. Treat as not committed // and let the retry loop resubmit. + tracing::warn!( + id, + wait_ms = start.elapsed().as_millis() as u64, + status_reads, + "udb commit row missing before terminal status (gc'd); treating as not committed" + ); return Err(DatabaseError::NotCommitted.into()); } Err(err) => { - tracing::debug!(?err, "transient error polling commit status, retrying"); + tracing::debug!(?err, id, "transient error polling commit status, retrying"); } } // If a new leader took over, our old-epoch request will never be claimed. if let Some(current) = shared.current_lease() { if current.epoch != submit_epoch { + tracing::warn!( + id, + submit_epoch, + current_epoch = current.epoch, + wait_ms = start.elapsed().as_millis() as u64, + notify_wakes, + poll_wakes, + "udb commit orphaned by leader failover; treating as not committed" + ); return Err(DatabaseError::NotCommitted.into()); } } + } +} - tokio::select! { - res = reply_rx.recv() => { - match res { - Ok(_) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - *reply_rx = shared - .listener - .listen(&reply_channel(&shared.node_id)) - .await; - } - } - } - _ = tokio::time::sleep(RESULT_POLL_INTERVAL) => {} - } +enum ReplyOutcome { + Committed, + Conflict, +} + +/// Parse a leader reply payload (`":committed:"` or `":conflict"`). Returns +/// `None` when the payload is for a different waiter on the shared reply channel or is unparseable. +fn parse_reply(payload: &str, id: i64) -> Option { + let mut parts = payload.split(':'); + let reply_id: i64 = parts.next()?.parse().ok()?; + if reply_id != id { + return None; + } + match parts.next()? { + "committed" => Some(ReplyOutcome::Committed), + "conflict" => Some(ReplyOutcome::Conflict), + _ => None, } } diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs b/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs index d9729de884..278bb70fe7 100644 --- a/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs +++ b/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs @@ -1,66 +1,142 @@ +use std::collections::{BTreeMap, HashMap}; + use anyhow::{Context, Result}; -use deadpool_postgres::Transaction; use crate::{ atomic::apply_atomic_op, options::MutationType, tuple::Versionstamp, tx_ops::Operation, versionstamp::substitute_raw_versionstamp, }; -/// Apply a winning transaction's operations to `kv` inside the leader's batch txn. -/// -/// `commit_version` is the Postgres-resolved version assigned to this commit (`nextval`). It is -/// substituted into the 8-byte committed-version slot of any versionstamped key/value so -/// versionstamps are globally monotonic with commit order across all follower processes. -pub async fn apply( - txn: &Transaction<'_>, - operations: Vec, - commit_version: u64, -) -> Result<()> { - // Distinguishes multiple versionstamped operations within a single commit so their 10-byte - // stamps stay unique (8-byte version shared, 2-byte counter incremented). - let mut versionstamp_counter: u16 = 0; - - for op in operations { - match op { - Operation::SetValue { key, value } => { - upsert(txn, &key, &value).await?; - } - Operation::Clear { key } => { - txn.execute("DELETE FROM kv WHERE key = $1", &[&key]) - .await - .context("failed to clear key")?; - } - Operation::ClearRange { begin, end } => { - txn.execute( - "DELETE FROM kv WHERE key >= $1 AND key < $2", - &[&begin, &end], - ) - .await - .context("failed to clear range")?; +/// A winning commit's Postgres-resolved version and its decoded operations. Winners are folded in id +/// order. +pub struct Winner { + pub commit_version: u64, + pub operations: Vec, +} + +/// The materialized result of folding a batch of winners over the current `kv` state. Each distinct +/// key appears at most once across `upserts` and `point_deletes`, so the batch's point writes +/// collapse to a fixed number of statements regardless of batch size. +pub struct WriteSet { + pub upserts: Vec<(Vec, Vec)>, + pub point_deletes: Vec>, + pub range_deletes: Vec<(Vec, Vec)>, +} + +/// In-memory working state layered over the pre-batch `kv` snapshot. Folding the batch's winners +/// through this overlay in id order reproduces the exact serial semantics of applying each commit +/// one at a time: a later commit's atomic read observes an earlier commit's write because the +/// overlay is the live working state. +struct Overlay<'a> { + /// Pre-batch values for every key read by an atomic op in the batch. The only base read. + base: &'a HashMap, Vec>, + /// Point writes layered over the base. `Some` is a set, `None` is a point tombstone. + points: BTreeMap, Option>>, + /// Range tombstones in fold order. A key with no overlaying point write that falls in any range + /// reads as absent. + ranges: Vec<(Vec, Vec)>, +} + +impl<'a> Overlay<'a> { + fn new(base: &'a HashMap, Vec>) -> Self { + Overlay { + base, + points: BTreeMap::new(), + ranges: Vec::new(), + } + } + + /// Read-through lookup: an overlaying point write wins, then a range tombstone, then the base. + fn get(&self, key: &[u8]) -> Option> { + if let Some(value) = self.points.get(key) { + return value.clone(); + } + if self + .ranges + .iter() + .any(|(begin, end)| key >= begin.as_slice() && key < end.as_slice()) + { + return None; + } + self.base.get(key).cloned() + } + + fn set(&mut self, key: Vec, value: Vec) { + self.points.insert(key, Some(value)); + } + + fn clear(&mut self, key: Vec) { + self.points.insert(key, None); + } + + fn clear_range(&mut self, begin: Vec, end: Vec) { + // Drop any point writes inside the range; the range delete subsumes them, and a later set of + // a key in the range re-adds a point write that wins on read-through again. + let covered: Vec> = self + .points + .range(begin.clone()..end.clone()) + .map(|(key, _)| key.clone()) + .collect(); + for key in covered { + self.points.remove(&key); + } + self.ranges.push((begin, end)); + } + + fn into_write_set(self) -> WriteSet { + let mut upserts = Vec::new(); + let mut point_deletes = Vec::new(); + for (key, value) in self.points { + match value { + Some(value) => upserts.push((key, value)), + None => point_deletes.push(key), } - Operation::AtomicOp { - key, - param, - op_type, - } => { - apply_atomic( - txn, + } + WriteSet { + upserts, + point_deletes, + range_deletes: self.ranges, + } + } +} + +/// Fold every winner's operations, in id order, into a single materialized write-set over the +/// pre-batch `base` snapshot. `base` must contain the current value of every key returned by +/// [`atomic_read_keys`]. +pub fn fold_winners(winners: Vec, base: &HashMap, Vec>) -> Result { + let mut overlay = Overlay::new(base); + + for winner in winners { + // Distinguishes multiple versionstamped operations within a single commit so their 10-byte + // stamps stay unique (8-byte version shared, 2-byte counter incremented). Resets per winner. + let mut versionstamp_counter: u16 = 0; + + for op in winner.operations { + match op { + Operation::SetValue { key, value } => overlay.set(key, value), + Operation::Clear { key } => overlay.clear(key), + Operation::ClearRange { begin, end } => overlay.clear_range(begin, end), + Operation::AtomicOp { key, param, op_type, - commit_version, + } => fold_atomic( + &mut overlay, + key, + param, + op_type, + winner.commit_version, &mut versionstamp_counter, - ) - .await?; + )?, } } } - Ok(()) + Ok(overlay.into_write_set()) } -async fn apply_atomic( - txn: &Transaction<'_>, +fn fold_atomic( + overlay: &mut Overlay<'_>, key: Vec, param: Vec, op_type: MutationType, @@ -73,17 +149,17 @@ async fn apply_atomic( let key = substitute_raw_versionstamp(key, &versionstamp) .map_err(anyhow::Error::msg) .context("failed substituting versionstamped key")?; - upsert(txn, &key, ¶m).await?; + overlay.set(key, param); } MutationType::SetVersionstampedValue => { let versionstamp = build_versionstamp(commit_version, versionstamp_counter); let value = substitute_raw_versionstamp(param, &versionstamp) .map_err(anyhow::Error::msg) .context("failed substituting versionstamped value")?; - upsert(txn, &key, &value).await?; + overlay.set(key, value); } - // Read-modify-write atomics: the leader is the single writer, so reading the live value - // inside the apply txn and writing the result is serializable with no lost update. + // Read-modify-write atomics: the leader is the single writer, so reading the overlay's + // working value and writing the result is serializable with no lost update. MutationType::Add | MutationType::And | MutationType::BitAnd @@ -97,20 +173,10 @@ async fn apply_atomic( | MutationType::ByteMin | MutationType::ByteMax | MutationType::CompareAndClear => { - let current = txn - .query_opt("SELECT value FROM kv WHERE key = $1", &[&key]) - .await - .context("failed to read current value for atomic op")? - .map(|row| row.get::<_, Vec>(0)); - - let new_value = apply_atomic_op(current.as_deref(), ¶m, op_type); - - if let Some(new_value) = new_value { - upsert(txn, &key, &new_value).await?; - } else { - txn.execute("DELETE FROM kv WHERE key = $1", &[&key]) - .await - .context("failed to clear key after atomic op")?; + let current = overlay.get(&key); + match apply_atomic_op(current.as_deref(), ¶m, op_type) { + Some(new_value) => overlay.set(key, new_value), + None => overlay.clear(key), } } } @@ -118,14 +184,39 @@ async fn apply_atomic( Ok(()) } -async fn upsert(txn: &Transaction<'_>, key: &[u8], value: &[u8]) -> Result<()> { - txn.execute( - "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2", - &[&key, &value], - ) - .await - .context("failed to upsert kv")?; - Ok(()) +/// Every key a winner's atomic op reads, so the leader can fetch them all in one bulk query before +/// folding. Versionstamped ops do not read, so their keys are skipped. +pub fn atomic_read_keys(winners: &[Winner]) -> Vec> { + let mut keys = Vec::new(); + for winner in winners { + for op in &winner.operations { + if let Operation::AtomicOp { key, op_type, .. } = op { + if reads_current_value(*op_type) { + keys.push(key.clone()); + } + } + } + } + keys +} + +fn reads_current_value(op_type: MutationType) -> bool { + match op_type { + MutationType::SetVersionstampedKey | MutationType::SetVersionstampedValue => false, + MutationType::Add + | MutationType::And + | MutationType::BitAnd + | MutationType::Or + | MutationType::BitOr + | MutationType::Xor + | MutationType::BitXor + | MutationType::AppendIfFits + | MutationType::Max + | MutationType::Min + | MutationType::ByteMin + | MutationType::ByteMax + | MutationType::CompareAndClear => true, + } } /// Build a 10-byte versionstamp (plus the 2 user-version bytes the substitution helper ignores) @@ -137,3 +228,9 @@ fn build_versionstamp(commit_version: u64, counter: &mut u16) -> Versionstamp { *counter = counter.wrapping_add(1); Versionstamp::from(bytes) } + +// The fold operates on private types (`Overlay`, `Winner`, `WriteSet`) that integration tests under +// `tests/` cannot reach, so its unit tests live in a source-owned sibling file via this shim. +#[cfg(test)] +#[path = "apply_tests.rs"] +mod tests; diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/apply_tests.rs b/engine/packages/universaldb/src/driver/postgres/resolver/apply_tests.rs new file mode 100644 index 0000000000..95d991176d --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/resolver/apply_tests.rs @@ -0,0 +1,299 @@ +use std::collections::HashMap; + +use super::*; + +fn k(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +fn add_param(n: i64) -> Vec { + n.to_le_bytes().to_vec() +} + +fn read_int(value: &[u8]) -> i64 { + let mut buf = [0u8; 8]; + let len = value.len().min(8); + buf[..len].copy_from_slice(&value[..len]); + i64::from_le_bytes(buf) +} + +fn set(key: &str, value: &str) -> Operation { + Operation::SetValue { + key: k(key), + value: k(value), + } +} + +fn clear(key: &str) -> Operation { + Operation::Clear { key: k(key) } +} + +fn clear_range(begin: &str, end: &str) -> Operation { + Operation::ClearRange { + begin: k(begin), + end: k(end), + } +} + +fn add(key: &str, n: i64) -> Operation { + Operation::AtomicOp { + key: k(key), + param: add_param(n), + op_type: MutationType::Add, + } +} + +fn winner(commit_version: u64, operations: Vec) -> Winner { + Winner { + commit_version, + operations, + } +} + +/// Materialize a write-set over a base map to inspect the resulting `kv` state. +fn materialize(base: &HashMap, Vec>, write_set: WriteSet) -> HashMap, Vec> { + let mut state = base.clone(); + for (begin, end) in &write_set.range_deletes { + state.retain(|key, _| { + !(key.as_slice() >= begin.as_slice() && key.as_slice() < end.as_slice()) + }); + } + for key in &write_set.point_deletes { + state.remove(key); + } + for (key, value) in write_set.upserts { + state.insert(key, value); + } + state +} + +/// Reference oracle: apply each winner's operations one at a time directly to a working state, the +/// exact serial semantics the fold must reproduce. +fn reference(base: &HashMap, Vec>, winners: &[Winner]) -> HashMap, Vec> { + let mut state = base.clone(); + for w in winners { + let mut counter: u16 = 0; + for op in &w.operations { + match op { + Operation::SetValue { key, value } => { + state.insert(key.clone(), value.clone()); + } + Operation::Clear { key } => { + state.remove(key); + } + Operation::ClearRange { begin, end } => { + state.retain(|key, _| { + !(key.as_slice() >= begin.as_slice() && key.as_slice() < end.as_slice()) + }); + } + Operation::AtomicOp { + key, + param, + op_type, + } => match op_type { + MutationType::SetVersionstampedKey => { + let vs = build_versionstamp(w.commit_version, &mut counter); + let new_key = substitute_raw_versionstamp(key.clone(), &vs).unwrap(); + state.insert(new_key, param.clone()); + } + MutationType::SetVersionstampedValue => { + let vs = build_versionstamp(w.commit_version, &mut counter); + let new_value = substitute_raw_versionstamp(param.clone(), &vs).unwrap(); + state.insert(key.clone(), new_value); + } + _ => { + let current = state.get(key).map(|v| v.as_slice()); + match apply_atomic_op(current, param, *op_type) { + Some(v) => { + state.insert(key.clone(), v); + } + None => { + state.remove(key); + } + } + } + }, + } + } + } + state +} + +/// Build a 4-byte-offset-trailed buffer that `substitute_raw_versionstamp` can stamp at `offset`. +fn stampable(prefix: &[u8], offset: u32) -> Vec { + let mut buf = Vec::new(); + buf.extend_from_slice(prefix); + buf.extend_from_slice(&[0u8; 10]); + buf.extend_from_slice(&offset.to_le_bytes()); + buf +} + +/// `fold_winners` consumes its input, so clone for tests that also run the oracle on the same data. +fn fold_winners_clone(winners: &[Winner], base: &HashMap, Vec>) -> WriteSet { + let cloned: Vec = winners + .iter() + .map(|w| Winner { + commit_version: w.commit_version, + operations: w.operations.clone(), + }) + .collect(); + fold_winners(cloned, base).unwrap() +} + +#[test] +fn same_key_set_later_id_wins() { + let winners = vec![ + winner(1, vec![set("a", "first")]), + winner(2, vec![set("a", "second")]), + ]; + let base = HashMap::new(); + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + assert_eq!( + state.get(&k("a")).map(|v| v.as_slice()), + Some(b"second".as_slice()) + ); + assert_eq!(state, reference(&base, &winners)); +} + +#[test] +fn two_adds_same_key_fold_sequentially() { + // 5 -> 6 -> 7 across two commits in one batch. + let mut base = HashMap::new(); + base.insert(k("n"), add_param(5)); + let winners = vec![winner(1, vec![add("n", 1)]), winner(2, vec![add("n", 1)])]; + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + assert_eq!(read_int(state.get(&k("n")).unwrap()), 7); + assert_eq!(state, reference(&base, &winners)); +} + +#[test] +fn set_then_add_atomic_sees_the_set() { + let base = HashMap::new(); + let winners = vec![ + winner(1, vec![set("n", "\x0a\0\0\0\0\0\0\0")]), + winner(2, vec![add("n", 1)]), + ]; + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + assert_eq!(read_int(state.get(&k("n")).unwrap()), 11); + assert_eq!(state, reference(&base, &winners)); +} + +#[test] +fn clear_range_then_add_sees_none() { + let mut base = HashMap::new(); + base.insert(k("r/x"), add_param(100)); + let winners = vec![ + winner(1, vec![clear_range("r/", "r0")]), + winner(2, vec![add("r/x", 1)]), + ]; + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + // The range clear wiped the base 100, so the add starts from absent/0 -> 1. + assert_eq!(read_int(state.get(&k("r/x")).unwrap()), 1); + assert_eq!(state, reference(&base, &winners)); +} + +#[test] +fn set_inside_cleared_range_is_reinserted() { + let mut base = HashMap::new(); + base.insert(k("r/x"), k("old")); + let winners = vec![winner(1, vec![clear_range("r/", "r0"), set("r/x", "new")])]; + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + assert_eq!( + state.get(&k("r/x")).map(|v| v.as_slice()), + Some(b"new".as_slice()) + ); + assert_eq!(state, reference(&base, &winners)); +} + +#[test] +fn versionstamped_key_and_value_distinct_stamps() { + let base = HashMap::new(); + let winners = vec![winner( + 42, + vec![ + Operation::AtomicOp { + key: stampable(b"vk/", 3), + param: k("v1"), + op_type: MutationType::SetVersionstampedKey, + }, + Operation::AtomicOp { + key: k("vv/"), + param: stampable(b"", 0), + op_type: MutationType::SetVersionstampedValue, + }, + ], + )]; + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + assert_eq!(state, reference(&base, &winners)); + + // The versionstamped key embeds commit_version 42 with per-commit counter 0. + let stamped_key: Vec = { + let mut key = b"vk/".to_vec(); + key.extend_from_slice(&42u64.to_be_bytes()); + key.extend_from_slice(&0u16.to_be_bytes()); + key + }; + assert_eq!( + state.get(&stamped_key).map(|v| v.as_slice()), + Some(b"v1".as_slice()) + ); + + // The versionstamped value uses the same commit_version but the next counter (1). + let stamped_value = state.get(&k("vv/")).unwrap(); + assert_eq!(&stamped_value[0..8], &42u64.to_be_bytes()); + assert_eq!(&stamped_value[8..10], &1u16.to_be_bytes()); +} + +#[test] +fn point_delete_and_upsert_disjoint() { + let mut base = HashMap::new(); + base.insert(k("keep"), k("base")); + base.insert(k("drop"), k("base")); + let winners = vec![winner(1, vec![clear("drop"), set("keep", "new")])]; + let ws = fold_winners_clone(&winners, &base); + assert_eq!(ws.point_deletes, vec![k("drop")]); + assert_eq!(ws.upserts, vec![(k("keep"), k("new"))]); + let state = materialize(&base, ws); + assert_eq!(state, reference(&base, &winners)); +} + +/// A multi-op, multi-winner batch mixing every operation kind must equal the serial oracle. +#[test] +fn mixed_batch_matches_oracle() { + let mut base = HashMap::new(); + base.insert(k("c1"), add_param(10)); + base.insert(k("c2"), add_param(20)); + base.insert(k("old"), k("x")); + base.insert(k("range/a"), k("ra")); + base.insert(k("range/b"), k("rb")); + + let winners = vec![ + winner(1, vec![set("c1", "\x01\0\0\0\0\0\0\0"), add("c1", 4)]), + winner(2, vec![clear("old"), add("c2", 5)]), + winner( + 3, + vec![clear_range("range/", "range0"), set("range/a", "back")], + ), + winner(4, vec![add("c2", 100), add("c1", 1)]), + ]; + + let ws = fold_winners_clone(&winners, &base); + let state = materialize(&base, ws); + assert_eq!(state, reference(&base, &winners)); + + // Spot checks of the folded result. + assert_eq!(read_int(state.get(&k("c1")).unwrap()), 6); // 1 +4 +1 + assert_eq!(read_int(state.get(&k("c2")).unwrap()), 125); // 20 +5 +100 + assert!(state.get(&k("old")).is_none()); + assert_eq!( + state.get(&k("range/a")).map(|v| v.as_slice()), + Some(b"back".as_slice()) + ); + assert!(state.get(&k("range/b")).is_none()); +} diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs b/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs index f9b6c45461..e07304180c 100644 --- a/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs +++ b/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs @@ -2,15 +2,19 @@ mod apply; mod lease; use std::{ + collections::HashMap, sync::Arc, time::{Duration, Instant}, }; use anyhow::{Context, Result}; use tokio::sync::broadcast; +use tokio_util::task::AbortOnDropHandle; use crate::{conflict_tracker::TransactionConflictTracker, transaction::TXN_TIMEOUT}; +use lease::LEASE_TTL_SECS; + use super::shared::{ ELECTION_CHANNEL, LEASE_ID, LeaseInfo, PostgresShared, WATERMARK_CHANNEL, commit_channel, }; @@ -28,13 +32,6 @@ const POLL_BACKSTOP: Duration = Duration::from_millis(50); /// How long a candidate waits before retrying election when another node holds the lease. const ELECTION_RETRY: Duration = Duration::from_secs(2); -enum DrainOutcome { - /// Processed zero or more requests; still leader. - Drained, - /// Lost the lease (epoch bumped by a new leader). Step down. - LostLease, -} - /// Spawn the per-process resolver task. Every node runs this; only the elected leader drains the /// commit queue. The returned handle is aborted when the owning driver drops, which stops lease /// renewal so the lease expires and another node can take over (node-death / failover path). @@ -119,7 +116,12 @@ async fn notify_election(shared: &Arc) { } } -/// Leader main loop: hold the lease, drain the commit queue on wake or poll, and renew the lease. +/// Leader entry point: publish our lease, compute the cold-window floor, then run renewal and +/// draining as two sibling tasks. They coordinate purely by completion: when either returns (lease +/// lost or error), the other is aborted and the leader steps down. Both operations are safe to +/// hard-abort, so no explicit cancellation signalling is needed. A renew is a single fenced +/// `UPDATE`; a drain batch runs in one Postgres transaction that rolls back cleanly when dropped, +/// leaving its claimed requests `pending` for the next leader. async fn lead(shared: &Arc, epoch: i64) -> Result<()> { // Publish our own lease into the cache immediately so our local commits route to us. shared.set_lease(LeaseInfo { @@ -133,97 +135,191 @@ async fn lead(shared: &Arc, epoch: i64) -> Result<()> { let recovery_version = recovery_floor(shared).await?; let recovery_deadline = Instant::now() + TXN_TIMEOUT; - let tracker = TransactionConflictTracker::new(); + tracing::info!( + epoch, + recovery_version, + cold_window_ms = TXN_TIMEOUT.as_millis() as u64, + "udb leader entering lead loop" + ); + + // Renewal runs in its own task so the drain loop can never starve it: if renewal shared the + // drain loop, a single long drain under sustained load would block renewal past the lease TTL + // and the lease would be lost mid-drain, thrashing leadership. Both are held in abort-on-drop + // handles so a hard abort of this `run` task (driver drop / node death) tears them down. A leaked + // renew task would keep this dead leader's lease alive and block failover. + let mut renew = AbortOnDropHandle::new(tokio::spawn(renew_loop(shared.clone(), epoch))); + let mut drain = AbortOnDropHandle::new(tokio::spawn(drain_loop( + shared.clone(), + epoch, + recovery_version, + recovery_deadline, + ))); - let mut wake_rx = shared - .listener - .listen(&commit_channel(&shared.node_id)) - .await; + // Whichever task returns first (lease lost or error), step down; the other is aborted when its + // handle drops at the end of this scope. A clean exit yields its inner result; a panic surfaces + // through `?` as a join error. + tokio::select! { + res = &mut renew => res?, + res = &mut drain => res?, + } +} - let mut renew_interval = tokio::time::interval(RENEW_INTERVAL); - renew_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut poll_interval = tokio::time::interval(POLL_BACKSTOP); - poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); +/// Lease-renewal loop. Runs on its own task and pool connection so it cannot be starved by drain +/// work. Returns when the lease is definitively gone (epoch bumped by another node, or renewal +/// failing for the whole lease TTL), which causes [`lead`] to abort the drain task and step down. +async fn renew_loop(shared: Arc, epoch: i64) -> Result<()> { + let mut interval = tokio::time::interval(RENEW_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // The lease was just acquired/renewed (expires_at = now + TTL), so consume the immediate first + // tick and renew after one interval. + interval.tick().await; - // Drain anything already queued before our first wake. - if matches!( - drain(shared, epoch, &tracker, recovery_version, recovery_deadline).await?, - DrainOutcome::LostLease - ) { - return Ok(()); - } + let mut last_renew = Instant::now(); loop { - tokio::select! { - _ = renew_interval.tick() => { - if !lease::renew(&shared.pool, &shared.node_id, epoch).await? { - tracing::warn!(epoch, "lost udb lease on renew"); - return Ok(()); + interval.tick().await; + + let gap_ms = last_renew.elapsed().as_millis() as u64; + let renew_start = Instant::now(); + match lease::renew(&shared.pool, &shared.node_id, epoch).await { + Ok(true) => { + last_renew = Instant::now(); + let renew_query_ms = renew_start.elapsed().as_millis() as u64; + // With renewal on its own task this gap should track RENEW_INTERVAL closely; a large + // gap now points at pool or Postgres contention. + if gap_ms > RENEW_INTERVAL.as_millis() as u64 * 2 { + tracing::warn!( + epoch, + gap_since_last_renew_ms = gap_ms, + renew_query_ms, + "udb leader renew was delayed (pool or postgres contention)" + ); + } else { + tracing::debug!( + epoch, + gap_since_last_renew_ms = gap_ms, + renew_query_ms, + "udb leader renewed lease" + ); } } - res = wake_rx.recv() => { - match res { - Ok(_) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - wake_rx = shared.listener.listen(&commit_channel(&shared.node_id)).await; - } - } - if matches!( - drain(shared, epoch, &tracker, recovery_version, recovery_deadline).await?, - DrainOutcome::LostLease - ) { - return Ok(()); - } + Ok(false) => { + tracing::warn!( + epoch, + gap_since_last_renew_ms = gap_ms, + "udb leader lost lease on renew (epoch bumped by another node); stepping down" + ); + return Ok(()); } - _ = poll_interval.tick() => { - if matches!( - drain(shared, epoch, &tracker, recovery_version, recovery_deadline).await?, - DrainOutcome::LostLease - ) { + Err(err) => { + // A transient renew error is tolerable within the TTL; keep retrying. Only give up + // if we have been unable to renew for the whole lease TTL, at which point we can no + // longer assume we hold the lease. + if last_renew.elapsed() >= Duration::from_secs(LEASE_TTL_SECS as u64) { + tracing::warn!( + ?err, + epoch, + "udb leader renew failing past lease TTL; assuming lease lost and stepping down" + ); return Ok(()); } + tracing::warn!(?err, epoch, "udb leader lease renew errored; will retry"); } } } } -/// The version floor a freshly elected leader continues from: the higher of the durable watermark -/// and the sequence high-water. The LOGGED `udb_version_seq` is crash-safe, so this never regresses. -async fn recovery_floor(shared: &Arc) -> Result { - let durable = lease::current_durable_version(&shared.pool).await?; - - let conn = shared - .pool - .get() - .await - .context("failed to get connection for recovery floor")?; - let seq_high: i64 = conn - .query_one("SELECT last_value FROM udb_version_seq", &[]) - .await - .context("failed to read sequence high water")? - .get(0); - - Ok(durable.max(seq_high).max(0) as u64) -} - -/// Drain pending commit requests in id-ordered batches until none remain. Each batch resolves and -/// applies inside a single Postgres transaction (group commit), fenced on the leader's epoch. -async fn drain( - shared: &Arc, +/// Drain loop. Processes one batch per iteration. Draining until the queue emptied in a single call +/// could run for many seconds under sustained load; processing one batch at a time keeps the loop +/// at a clean await point between batches. A non-empty queue still drains back-to-back with no idle +/// wait, so throughput is unchanged. It blocks on `select!` (wake NOTIFY or poll backstop) only +/// when the queue is empty. Returns when this leader's epoch is fenced out; otherwise [`lead`] +/// aborts it when renewal reports the lease is lost. +async fn drain_loop( + shared: Arc, epoch: i64, - tracker: &TransactionConflictTracker, recovery_version: u64, recovery_deadline: Instant, -) -> Result { +) -> Result<()> { + let tracker = TransactionConflictTracker::new(); + + let mut wake_rx = shared + .listener + .listen(&commit_channel(&shared.node_id)) + .await; + + let mut poll_interval = tokio::time::interval(POLL_BACKSTOP); + poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { - match drain_batch(shared, epoch, tracker, recovery_version, recovery_deadline).await? { - BatchOutcome::Empty => return Ok(DrainOutcome::Drained), - BatchOutcome::Processed => {} - BatchOutcome::LostLease => return Ok(DrainOutcome::LostLease), + match drain_batch( + &shared, + epoch, + &tracker, + recovery_version, + recovery_deadline, + ) + .await? + { + BatchOutcome::LostLease => { + tracing::warn!( + epoch, + "udb leader stepping down: lost lease during drain (epoch fenced on watermark)" + ); + return Ok(()); + } + // More work may be pending; loop immediately to keep throughput up. + BatchOutcome::Processed => continue, + BatchOutcome::Empty => { + tokio::select! { + res = wake_rx.recv() => { + match res { + Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => { + wake_rx = shared + .listener + .listen(&commit_channel(&shared.node_id)) + .await; + } + } + } + _ = poll_interval.tick() => {} + } + } } } } +/// The cold-window rejection floor for a freshly elected leader: the durable watermark +/// (`udb_lease.durable_version`) at election time. +/// +/// Reasoning (do NOT change this back to `max(durable, seq_high)`): +/// +/// A new leader starts with an empty conflict tracker, so it cannot detect a read-write conflict +/// against any committed write it does not already know about. The writes it is missing are exactly +/// the previous leader's winners, and every winner's write is applied to `kv` AND its +/// `commit_version` folded into `durable_version` in the SAME apply transaction. So every missing +/// write has `commit_version <= durable_version`. A committing transaction `T` is therefore safe +/// iff `T.read_version >= durable_version`: every write above its read_version was committed by THIS +/// leader and is in the tracker. Only `T.read_version < durable_version` can race a missing winner, +/// so that is the exact set the cold window must reject. +/// +/// `udb_version_seq.last_value` (the sequence high-water) is NOT a valid floor. Every drained +/// request consumes a `nextval` BEFORE the conflict check, including conflicts and cold rejects, so +/// the sequence races far ahead of `durable_version` with versions that never produced any write. +/// Using `max(durable, seq_high)` rejects essentially every commit for the whole cold window +/// (followers read at `durable_version`, which is always `< seq_high`), turning each failover into +/// a 5s mass-reject storm. The gap `(durable_version, seq_high]` holds only thrown-away loser +/// versions, so nothing in it is a missing write to guard against. +/// +/// Version ASSIGNMENT is unaffected: commit versions still come from `nextval('udb_version_seq')` +/// in `drain_batch`, which is always above the sequence high-water, so uniqueness and monotonicity +/// across failover are preserved independently of this floor. +async fn recovery_floor(shared: &Arc) -> Result { + let durable = lease::current_durable_version(&shared.pool).await?; + Ok(durable.max(0) as u64) +} + enum BatchOutcome { Empty, Processed, @@ -232,7 +328,9 @@ enum BatchOutcome { struct Reply { channel: String, - id: i64, + /// The follower's reply payload, encoding the outcome so the waiter resolves without a status + /// SELECT: `":committed:"` or `":conflict"`. + payload: String, } async fn drain_batch( @@ -273,30 +371,55 @@ async fn drain_batch( return Ok(BatchOutcome::Empty); } + let batch_start = Instant::now(); let cold_window = Instant::now() < recovery_deadline; + let batch_len = rows.len(); let mut max_winner_cv: i64 = 0; - let mut replies = Vec::with_capacity(rows.len()); - - for row in &rows { + let mut replies = Vec::with_capacity(batch_len); + let mut committed_count = 0u32; + let mut conflict_count = 0u32; + let mut cold_reject_count = 0u32; + + // Allocate all commit versions for the batch in one round-trip instead of a `nextval` per row. + // They are assigned to rows in id order (winners and losers alike; losers' versions are + // harmlessly skipped), so versionstamps stay monotonic with commit order. The defensive sort + // keeps assignment monotonic regardless of how Postgres orders the per-row `nextval` evaluation. + let mut versions: Vec = txn + .query( + "SELECT nextval('udb_version_seq') FROM generate_series(1, $1::bigint)", + &[&(batch_len as i64)], + ) + .await + .context("failed to allocate commit versions")? + .iter() + .map(|row| row.get::<_, i64>(0)) + .collect(); + versions.sort_unstable(); + + // Resolve every request in memory in id order. Winners are collected with their version and + // operations for the fold; the bulk status stamp is built for all rows at once. + let mut winners: Vec = Vec::new(); + let mut stamp_ids: Vec = Vec::with_capacity(batch_len); + let mut stamp_statuses: Vec<&str> = Vec::with_capacity(batch_len); + let mut stamp_versions: Vec> = Vec::with_capacity(batch_len); + + for (i, row) in rows.iter().enumerate() { let id: i64 = row.get(0); let read_version: i64 = row.get(1); let payload: Vec = row.get(2); let reply_channel: String = row.get(3); + let commit_version = versions[i]; let decoded = super::codec::decode_commit_request(&payload) .context("failed to decode commit payload")?; - let commit_version: i64 = txn - .query_one("SELECT nextval('udb_version_seq')", &[]) - .await - .context("failed to get next commit version")? - .get(0); - let start_version = read_version.max(0) as u64; // Cold-window guard: a commit whose read_version predates the recovery floor cannot be // safely resolved against this leader's empty window. Reject it as retryable. - let conflicted = if cold_window && start_version < recovery_version { + let cold_rejected = cold_window && start_version < recovery_version; + let conflicted = if cold_rejected { + cold_reject_count += 1; true } else { tracker @@ -308,32 +431,98 @@ async fn drain_batch( .await }; + stamp_ids.push(id); if conflicted { - txn.execute( - "UPDATE udb_commit_requests SET status = 'conflict' WHERE id = $1", - &[&id], - ) - .await - .context("failed to stamp conflict")?; + if !cold_rejected { + conflict_count += 1; + } + stamp_statuses.push("conflict"); + stamp_versions.push(None); } else { - apply::apply(&txn, decoded.operations, commit_version.max(0) as u64) - .await - .context("failed to apply commit")?; - txn.execute( - "UPDATE udb_commit_requests SET status = 'committed', commit_version = $1 WHERE id = $2", - &[&commit_version, &id], - ) - .await - .context("failed to stamp committed")?; + committed_count += 1; + stamp_statuses.push("committed"); + stamp_versions.push(Some(commit_version)); max_winner_cv = max_winner_cv.max(commit_version); + winners.push(apply::Winner { + commit_version: commit_version.max(0) as u64, + operations: decoded.operations, + }); } + let reply_payload = if conflicted { + format!("{id}:conflict") + } else { + format!("{id}:committed:{commit_version}") + }; replies.push(Reply { channel: reply_channel, - id, + payload: reply_payload, }); } + // Bulk-read the pre-batch value of every key a winner's atomic op reads in one query, then fold + // all winners into a single materialized write-set in memory. This collapses the per-row apply + // round-trips to a fixed count independent of batch size. + let atomic_keys = apply::atomic_read_keys(&winners); + let base = if atomic_keys.is_empty() { + HashMap::new() + } else { + txn.query( + "SELECT key, value FROM kv WHERE key = ANY($1::bytea[])", + &[&atomic_keys], + ) + .await + .context("failed to bulk-read atomic op keys")? + .into_iter() + .map(|row| (row.get::<_, Vec>(0), row.get::<_, Vec>(1))) + .collect() + }; + + let apply::WriteSet { + upserts, + point_deletes, + range_deletes, + } = apply::fold_winners(winners, &base).context("failed to fold batch winners")?; + + // Materialize the write-set in O(1) statements per kind. Range deletes run first so a key whose + // final state is a set but that fell inside an earlier range clear is re-inserted by the upsert, + // not removed. + for (begin, end) in &range_deletes { + txn.execute("DELETE FROM kv WHERE key >= $1 AND key < $2", &[begin, end]) + .await + .context("failed to clear range")?; + } + if !point_deletes.is_empty() { + txn.execute( + "DELETE FROM kv WHERE key = ANY($1::bytea[])", + &[&point_deletes], + ) + .await + .context("failed to bulk-delete cleared keys")?; + } + if !upserts.is_empty() { + let (keys, values): (Vec>, Vec>) = upserts.into_iter().unzip(); + txn.execute( + "INSERT INTO kv (key, value) + SELECT * FROM unnest($1::bytea[], $2::bytea[]) + ON CONFLICT (key) DO UPDATE SET value = excluded.value", + &[&keys, &values], + ) + .await + .context("failed to bulk-upsert kv")?; + } + + // Stamp every request's terminal status in one statement instead of a per-row UPDATE. + txn.execute( + "UPDATE udb_commit_requests AS r + SET status = b.status, commit_version = b.cv + FROM unnest($1::bigint[], $2::text[], $3::bigint[]) AS b(id, status, cv) + WHERE r.id = b.id", + &[&stamp_ids, &stamp_statuses, &stamp_versions], + ) + .await + .context("failed to stamp commit statuses")?; + // Advance the watermark, fenced on our epoch. A zombie old leader whose epoch was bumped sees // zero rows updated and must step down before any of its writes become visible. let new_durable: i64 = match txn @@ -362,6 +551,18 @@ async fn drain_batch( notify_after_commit(&conn, new_durable, &replies).await; + tracing::info!( + epoch, + batch_len, + committed = committed_count, + conflict = conflict_count, + cold_reject = cold_reject_count, + cold_window, + new_durable, + batch_ms = batch_start.elapsed().as_millis() as u64, + "udb leader processed commit batch" + ); + Ok(BatchOutcome::Processed) } @@ -383,11 +584,11 @@ async fn notify_after_commit( } let channels: Vec<&str> = replies.iter().map(|r| r.channel.as_str()).collect(); - let ids: Vec = replies.iter().map(|r| r.id.to_string()).collect(); + let payloads: Vec<&str> = replies.iter().map(|r| r.payload.as_str()).collect(); if let Err(err) = conn .execute( "SELECT pg_notify(c, p) FROM unnest($1::text[], $2::text[]) AS t(c, p)", - &[&channels, &ids], + &[&channels, &payloads], ) .await { diff --git a/engine/packages/universaldb/src/driver/postgres/shared.rs b/engine/packages/universaldb/src/driver/postgres/shared.rs index 5b14c4df08..b26f14a361 100644 --- a/engine/packages/universaldb/src/driver/postgres/shared.rs +++ b/engine/packages/universaldb/src/driver/postgres/shared.rs @@ -97,6 +97,21 @@ impl PostgresShared { /// Publish a freshly observed/elected lease into the cache. pub fn set_lease(&self, lease: LeaseInfo) { + let changed = self + .lease_rx + .borrow() + .as_ref() + .map(|prev| prev.epoch != lease.epoch || prev.leader_addr != lease.leader_addr) + .unwrap_or(true); + if changed { + tracing::info!( + epoch = lease.epoch, + leader_addr = %lease.leader_addr, + self_node = %self.node_id, + is_self = (lease.leader_addr == self.node_id), + "udb follower observed leader lease change" + ); + } let _ = self.lease_tx.send(Some(lease)); } diff --git a/self-host/compose/template/src/docker-compose.ts b/self-host/compose/template/src/docker-compose.ts index 4e3781aab2..2245a31c75 100644 --- a/self-host/compose/template/src/docker-compose.ts +++ b/self-host/compose/template/src/docker-compose.ts @@ -292,6 +292,7 @@ export function generateDockerCompose(context: TemplateContext) { restart: "unless-stopped", environment: [ "RUST_LOG_ANSI_COLOR=1", + "RUST_LOG=universaldb::driver::postgres=debug", "RIVET_OTEL_ENABLED=1", "RIVET_OTEL_SAMPLER_RATIO=1", `RIVET_OTEL_GRPC_ENDPOINT=http://${context.getServiceHost("otel-collector", datacenter.name)}:4317`, diff --git a/self-host/dev-multinode/docker-compose.yml b/self-host/dev-multinode/docker-compose.yml index 389510f7b3..28234658d2 100644 --- a/self-host/dev-multinode/docker-compose.yml +++ b/self-host/dev-multinode/docker-compose.yml @@ -179,6 +179,7 @@ services: restart: unless-stopped environment: - RUST_LOG_ANSI_COLOR=1 + - RUST_LOG=universaldb::driver::postgres=debug - RIVET_OTEL_ENABLED=1 - RIVET_OTEL_SAMPLER_RATIO=1 - RIVET_OTEL_GRPC_ENDPOINT=http://otel-collector:4317 @@ -219,6 +220,7 @@ services: restart: unless-stopped environment: - RUST_LOG_ANSI_COLOR=1 + - RUST_LOG=universaldb::driver::postgres=debug - RIVET_OTEL_ENABLED=1 - RIVET_OTEL_SAMPLER_RATIO=1 - RIVET_OTEL_GRPC_ENDPOINT=http://otel-collector:4317 @@ -257,6 +259,7 @@ services: restart: unless-stopped environment: - RUST_LOG_ANSI_COLOR=1 + - RUST_LOG=universaldb::driver::postgres=debug - RIVET_OTEL_ENABLED=1 - RIVET_OTEL_SAMPLER_RATIO=1 - RIVET_OTEL_GRPC_ENDPOINT=http://otel-collector:4317