Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/packages/universaldb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tempfile.workspace = true
thiserror.workspace = true
tokio-postgres-rustls.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
tokio-util = { workspace = true, features = ["rt"] }
tokio.workspace = true
tracing.workspace = true
url.workspace = true
Expand Down
206 changes: 163 additions & 43 deletions engine/packages/universaldb/src/driver/postgres/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,43 @@ pub async fn submit(
.await
.context("failed to get connection for commit submit")?;

// Enqueue the request and wake the leader's drain loop in one round-trip. The autocommit
// statement durably inserts the row and fires the NOTIFY together, so there is no separate
// notify round-trip or second pool acquire.
let id: i64 = conn
.query_one(
"INSERT INTO udb_commit_requests (epoch, read_version, payload, reply_channel)
VALUES ($1, $2, $3, $4)
RETURNING id",
&[&lease.epoch, &read_version, &payload, &reply_channel],
"WITH ins AS (
INSERT INTO udb_commit_requests (epoch, read_version, payload, reply_channel)
VALUES ($1, $2, $3, $4)
RETURNING id
)
SELECT pg_notify($5, id::text), id FROM ins",
&[
&lease.epoch,
&read_version,
&payload,
&reply_channel,
&commit_channel(&lease.leader_addr),
],
)
.await
.context("failed to enqueue commit request")?
.get(0);

// Wake the leader's drain loop.
if let Err(err) = conn
.execute(
"SELECT pg_notify($1, $2)",
&[&commit_channel(&lease.leader_addr), &id.to_string()],
)
.await
{
tracing::debug!(
?err,
"failed to notify leader; relying on its poll backstop"
);
}
.context("failed to enqueue and notify commit request")?
.get(1);

// Release the connection before waiting so a long wait does not pin a pool slot. The request
// row is durable, so await_result re-acquires a connection per poll.
drop(conn);

await_result(shared, id, lease.epoch, &mut reply_rx).await
let submit_start = Instant::now();
let result = await_result(shared, id, lease.epoch, &mut reply_rx).await;
tracing::debug!(
id,
epoch = lease.epoch,
wait_ms = submit_start.elapsed().as_millis() as u64,
ok = result.is_ok(),
"udb commit submit completed"
);
result
}

/// Wait for a known leader, returning a retryable error if none is elected in time.
Expand All @@ -98,53 +105,166 @@ async fn wait_for_leader(shared: &Arc<PostgresShared>) -> Result<LeaseInfo> {
}
}

/// Poll the request row until it reaches a terminal status, woken by reply NOTIFYs with a polling
/// backstop. Bails as retryable if the leader epoch advances (our request is now orphaned and will
/// never be applied, so it is definitively not committed).
/// Wait for the commit result, resolved directly from the leader's reply NOTIFY payload on the happy
/// path. A polling `read_status` backstop covers a missed/lagged NOTIFY, and an epoch advance orphans
/// the request (it will never be applied, so it is definitively not committed).
async fn await_result(
shared: &Arc<PostgresShared>,
id: i64,
submit_epoch: i64,
reply_rx: &mut tokio::sync::broadcast::Receiver<String>,
) -> Result<()> {
let start = Instant::now();
// Diagnostics: count how the waiter is driven so we can tell whether the reply NOTIFY is doing
// its job or whether commits are riding the slow poll backstop / getting orphaned by failover.
let mut status_reads = 0u32;
let mut notify_wakes = 0u32;
let mut poll_wakes = 0u32;

// The backstop runs on a fixed-cadence interval, not a per-iteration sleep: under a flood of
// other commits' replies on this node's shared reply channel (which we skip past), a fresh
// per-iteration sleep would keep resetting and never fire, starving the backstop if our own
// NOTIFY was lost. An interval ticks on wall-clock cadence regardless of loop churn.
let mut poll_interval = tokio::time::interval(RESULT_POLL_INTERVAL);
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// Consume the immediate first tick so the first backstop is one interval out, after the reply
// has had a chance to arrive.
poll_interval.tick().await;

loop {
// Re-acquire a connection per poll: the request row is durable, so a transient pool/query
// error just means we retry the poll rather than failing a possibly-applied commit.
// Wait for our reply NOTIFY, falling back to a status read on a poll tick or a lagged
// broadcast. The happy path resolves straight from the payload with no status SELECT.
tokio::select! {
res = reply_rx.recv() => {
match res {
Ok(payload) => {
notify_wakes += 1;
match parse_reply(&payload, id) {
Some(ReplyOutcome::Committed) => {
tracing::debug!(
id,
wait_ms = start.elapsed().as_millis() as u64,
status_reads,
notify_wakes,
poll_wakes,
"udb commit resolved: committed"
);
return Ok(());
}
Some(ReplyOutcome::Conflict) => {
tracing::debug!(
id,
wait_ms = start.elapsed().as_millis() as u64,
status_reads,
notify_wakes,
poll_wakes,
"udb commit resolved: conflict"
);
return Err(DatabaseError::NotCommitted.into());
}
// Reply for another waiter on the shared channel; keep waiting for ours.
None => continue,
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
// We may have missed our own reply; fall through to the status backstop.
notify_wakes += 1;
tracing::debug!(id, lagged = n, "udb reply broadcast lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::warn!(id, "udb reply broadcast closed; re-subscribing");
*reply_rx = shared
.listener
.listen(&reply_channel(&shared.node_id))
.await;
continue;
}
}
}
_ = poll_interval.tick() => {
poll_wakes += 1;
}
}

// Backstop path (poll tick or lagged broadcast): re-acquire a connection and read the durable
// status. A transient pool/query error just means we retry rather than failing a
// possibly-applied commit.
status_reads += 1;
match read_status(shared, id).await {
Ok(Some(Status::Committed)) => return Ok(()),
Ok(Some(Status::Conflict)) => return Err(DatabaseError::NotCommitted.into()),
Ok(Some(Status::Committed)) => {
tracing::debug!(
id,
wait_ms = start.elapsed().as_millis() as u64,
status_reads,
notify_wakes,
poll_wakes,
"udb commit resolved: committed (backstop)"
);
return Ok(());
}
Ok(Some(Status::Conflict)) => {
tracing::debug!(
id,
wait_ms = start.elapsed().as_millis() as u64,
status_reads,
notify_wakes,
poll_wakes,
"udb commit resolved: conflict (backstop)"
);
return Err(DatabaseError::NotCommitted.into());
}
Ok(Some(Status::Pending)) => {}
Ok(None) => {
// The row was GC'd before we observed a terminal status. Treat as not committed
// and let the retry loop resubmit.
tracing::warn!(
id,
wait_ms = start.elapsed().as_millis() as u64,
status_reads,
"udb commit row missing before terminal status (gc'd); treating as not committed"
);
return Err(DatabaseError::NotCommitted.into());
}
Err(err) => {
tracing::debug!(?err, "transient error polling commit status, retrying");
tracing::debug!(?err, id, "transient error polling commit status, retrying");
}
}

// If a new leader took over, our old-epoch request will never be claimed.
if let Some(current) = shared.current_lease() {
if current.epoch != submit_epoch {
tracing::warn!(
id,
submit_epoch,
current_epoch = current.epoch,
wait_ms = start.elapsed().as_millis() as u64,
notify_wakes,
poll_wakes,
"udb commit orphaned by leader failover; treating as not committed"
);
return Err(DatabaseError::NotCommitted.into());
}
}
}
}

tokio::select! {
res = reply_rx.recv() => {
match res {
Ok(_) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
*reply_rx = shared
.listener
.listen(&reply_channel(&shared.node_id))
.await;
}
}
}
_ = tokio::time::sleep(RESULT_POLL_INTERVAL) => {}
}
enum ReplyOutcome {
Committed,
Conflict,
}

/// Parse a leader reply payload (`"<id>:committed:<commit_version>"` or `"<id>:conflict"`). Returns
/// `None` when the payload is for a different waiter on the shared reply channel or is unparseable.
fn parse_reply(payload: &str, id: i64) -> Option<ReplyOutcome> {
let mut parts = payload.split(':');
let reply_id: i64 = parts.next()?.parse().ok()?;
if reply_id != id {
return None;
}
match parts.next()? {
"committed" => Some(ReplyOutcome::Committed),
"conflict" => Some(ReplyOutcome::Conflict),
_ => None,
}
}

Expand Down
Loading
Loading