diff --git a/crutest/src/main.rs b/crutest/src/main.rs index 001794170..8587d1c8f 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -2702,6 +2702,15 @@ async fn replace_before_active( // that the initial downstairs are all synced up on the same flush and // generation numbers. fill_workload(volume, di, true).await?; + + // Track which SocketAddr corresponds to which region. This shifts over + // time as the test runs, so we have to track it correctly disable 2 + // downstairs for a given region. + let mut regions = vec![]; + for i in 0..targets.len() - 1 { + regions.push(Some(i as u32 / 3)); + } + regions.push(None); let ds_total = targets.len() - 1; let mut old_ds = 0; let mut new_ds = targets.len() - 1; @@ -2719,26 +2728,42 @@ async fn replace_before_active( tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } - // Stop a downstairs, wait for dsc to confirm it is stopped. - dsc_client.dsc_stop(old_ds).await.unwrap(); - loop { - let res = dsc_client.dsc_get_ds_state(old_ds).await.unwrap(); - let state = res.into_inner(); - if state == DownstairsState::Exit { - break; + // Pick a second downstairs that's in the same region, so we can stop + // two downstairs and prevent activation. This is linear-time with the + // number of targets, but that's fine (so is writing to every block). + assert!(regions[new_ds].is_none()); + let region = regions[old_ds].unwrap(); + let (other_ds, _) = regions + .iter() + .enumerate() + .find(|(i, d)| *i != old_ds && **d == Some(region)) + .unwrap(); + + // Stop two downstairs in the same region, then wait for dsc to confirm + // they are stopped. Having two downstairs stopped blocks activation. + for old_ds in [old_ds, other_ds] { + dsc_client.dsc_stop(old_ds as u32).await.unwrap(); + loop { + let res = + dsc_client.dsc_get_ds_state(old_ds as u32).await.unwrap(); + let state = res.into_inner(); + if state == DownstairsState::Exit { + break; + } + tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } - tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } info!(log, "[{c}] Request the upstairs activate"); - // Spawn a task to re-activate, this will not finish till all three - // downstairs respond. + // Spawn a task to re-activate, this will not finish until 2-3 + // downstairs respond (and we have disabled all but 1) generation += 1; let gc = volume.clone(); let handle = tokio::spawn(async move { gc.activate_with_gen(generation).await }); - // Give the activation request time to percolate in the upstairs. + // Give the activation request time to percolate in the upstairs; it + // shouldn't get anywhere because we don't have enough downstairs tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; let is_active = volume.query_is_active().await.unwrap(); info!(log, "[{c}] activate should now be waiting {:?}", is_active); @@ -2747,13 +2772,13 @@ async fn replace_before_active( info!( log, "[{c}] Replacing DS {old_ds}:{} with {new_ds}:{}", - targets[old_ds as usize], + targets[old_ds], targets[new_ds], ); match volume .replace_downstairs( Uuid::new_v4(), - targets[old_ds as usize], + targets[old_ds], targets[new_ds], ) .await @@ -2764,6 +2789,9 @@ async fn replace_before_active( } } + // At this point, we've got two Downstairs (one of which was provided + // initially, and one of which has just been replaced), so activation + // should happen! info!(log, "[{c}] Wait for activation after replacement"); loop { let is_active = volume.query_is_active().await.unwrap(); @@ -2787,9 +2815,12 @@ async fn replace_before_active( bail!("Requested volume verify failed: {:?}", e) } - // Start up the old downstairs so it is ready for the next loop. - let res = dsc_client.dsc_start(old_ds).await; - info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res); + // Start up all the stopped downstairs so they are ready for the next + // loop. + for old_ds in [old_ds, other_ds] { + let res = dsc_client.dsc_start(old_ds as u32).await; + info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res); + } // Wait for all IO to finish before we continue loop { @@ -2808,12 +2839,13 @@ async fn replace_before_active( tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } - old_ds = (old_ds + 1) % (ds_total as u32 + 1); + regions.swap(old_ds, new_ds); + old_ds = (old_ds + 1) % (ds_total + 1); new_ds = (new_ds + 1) % (ds_total + 1); match wtq { WhenToQuit::Count { count } => { - if c > count { + if c >= count { break; } } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 9da0acbf3..0add0bf23 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -450,7 +450,24 @@ mod integration_tests { Ok(()) } + /// Stops the downstairs task, return a `(port, rport)` tuple + pub async fn stop(&mut self) -> Result<(u16, u16)> { + let ds = self.downstairs.take().unwrap(); + let port = ds.address().port(); + let rport = ds.repair_address().port(); + ds.stop().await?; + Ok((port, rport)) + } + pub async fn reboot_read_write(&mut self) -> Result<()> { + self.reboot_read_write_with_ports(0, 0).await + } + + pub async fn reboot_read_write_with_ports( + &mut self, + port: u16, + rport: u16, + ) -> Result<()> { let downstairs = Downstairs::new_builder(&self.path()?, false) .set_logger(csl()) .build()?; @@ -460,6 +477,8 @@ mod integration_tests { downstairs, DownstairsClientSettings { address: self.address, + port, + rport, ..DownstairsClientSettings::default() }, ) @@ -6271,4 +6290,156 @@ mod integration_tests { Ok(()) } + + #[tokio::test] + async fn connect_two_ds_then_deactivate() { + const BLOCK_SIZE: usize = 512; + + // Spin off three downstairs, build our Crucible struct. + let mut tds = DefaultTestDownstairsSet::small(false).await.unwrap(); + let opts = tds.opts(); + tds.downstairs1.stop().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let (guest, io) = Guest::new(None); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + + let res = guest + .write( + BlockIndex(0), + BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()), + ) + .await; + assert!(res.is_ok()); + + guest.deactivate().await.unwrap(); + } + + #[tokio::test] + async fn connect_two_ds_then_another() { + const BLOCK_SIZE: usize = 512; + + // Spin off three downstairs, build our Crucible struct. + let mut tds = DefaultTestDownstairsSet::small(false).await.unwrap(); + let opts = tds.opts(); + let (ds1_port, ds1_rport) = tds.downstairs1.stop().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let (guest, io) = Guest::new(None); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + + let res = guest + .write( + BlockIndex(0), + BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()), + ) + .await; + assert!(res.is_ok()); + + // Restart downstairs1, which should use live-repair to join the quorum + // + // We have to wait a while here, because there's a 10-second reconnect + // delay. + tds.downstairs1 + .reboot_read_write_with_ports(ds1_port, ds1_rport) + .await + .unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; + guest.deactivate().await.unwrap(); + + // Reconnect with only ds1 running, then confirm that it received the + // writes. We'll come up in read-only mode so that we can connect with + // just a single Downstairs, to make sure the reads go to DS1. + tds.downstairs1.reboot_read_only().await.unwrap(); + tds.downstairs2.stop().await.unwrap(); + tds.downstairs3.stop().await.unwrap(); + tds.crucible_opts.read_only = true; + tds.crucible_opts.target[0] = tds.downstairs1.address(); + let opts = tds.opts(); + let (guest, io) = Guest::new(None); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + let mut buf = Buffer::new(2, BLOCK_SIZE); + guest.read(BlockIndex(0), &mut buf).await.unwrap(); + + assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]); + } + + #[tokio::test] + async fn min_quorum_live_repair() { + const BLOCK_SIZE: usize = 512; + + // Spin off three downstairs, build our Crucible struct. + let mut tds = DefaultTestDownstairsSet::small(false).await.unwrap(); + + // Stop downstairs 1 before constructing the guest, so it won't be + // included and we'll do min-quorum reconciliation. + let (port, rport) = tds.downstairs1.stop().await.unwrap(); + + // Start the guest and do a write to ds 2 and 3. + let (guest, io) = Guest::new(None); + let opts = tds.opts(); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + guest.activate().await.unwrap(); + let res = guest + .write( + BlockIndex(0), + BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()), + ) + .await; + assert!(res.is_ok()); + + // Deactivate the guest, all without downstairs 1 participating + guest.deactivate().await.unwrap(); + + // At this point, the data has been written to DS 2 and 3. We'll start + // up again with DS 1 and 2, so min-quorum should do reconciliation. + + tds.downstairs1 + .reboot_read_write_with_ports(port, rport) + .await + .unwrap(); + tds.downstairs2.stop().await.unwrap(); + guest.activate_with_gen(2).await.unwrap(); + + let mut buf = Buffer::new(2, BLOCK_SIZE); + guest.read(BlockIndex(0), &mut buf).await.unwrap(); + + assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]); + } + + #[tokio::test] + async fn min_quorum_cancel() { + // Spin off three downstairs, build our Crucible struct. + let mut tds = DefaultTestDownstairsSet::small(false).await.unwrap(); + + // Stop downstairs 1 before constructing the guest, so it won't be + // included and we'll do min-quorum reconciliation. + let (port, rport) = tds.downstairs1.stop().await.unwrap(); + + // Start the guest and do a write to ds 2 and 3. + let (guest, io) = Guest::new(None); + let opts = tds.opts(); + let _join_handle = up_main(opts, 1, None, io, None).unwrap(); + let s = tokio::spawn(async move { guest.activate().await }); + + // Get into our min-quorum wait, which is 500 ms + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Stop DS2 + tds.downstairs2.stop().await.unwrap(); + + // Wait for the min-quorum timer to go off; it shouldn't panic! + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Restart DS1, we're now eligible for min-quorum negotiation again + tds.downstairs1 + .reboot_read_write_with_ports(port, rport) + .await + .unwrap(); + + s.await.unwrap().unwrap() + } } diff --git a/tools/test_repair.sh b/tools/test_repair.sh index 5f50b5451..d3564c379 100755 --- a/tools/test_repair.sh +++ b/tools/test_repair.sh @@ -213,6 +213,9 @@ while [[ $count -lt $loops ]]; do ds2_pid=$! fi + # Wait for it to start up + sleep 10 + cp "$verify_file" ${verify_file}.last echo "Verifying data now" echo ${ct} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" --range -q -g "$generation" > "$test_log" @@ -220,7 +223,7 @@ while [[ $count -lt $loops ]]; do then echo "Exit on verify fail, loop: $count, choice: $choice" echo "Check $test_log for details" - cleanup + cleanup exit 1 fi set +o errexit diff --git a/tools/test_up.sh b/tools/test_up.sh index 807f5c292..6a55bccdf 100755 --- a/tools/test_up.sh +++ b/tools/test_up.sh @@ -289,6 +289,13 @@ if ! "$dsc" cmd start -c 2; then echo "Failed repair test part 1, starting downstairs 2" >> "$fail_log" echo fi +state=$("$dsc" cmd state -c 2) +while [[ "$state" != "Running" ]]; do + echo "downstairs 2 not restarted yet, waiting" + sleep 5 + state=$("$dsc" cmd state -c 2) +done +echo "Downstairs 2 restarted" # Put a dump test in the middle of the repair test, so we # can see both a mismatch and that dump works. diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index a2be7f48b..e10e24288 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -436,31 +436,34 @@ impl DownstairsClient { /// Sets our state to `DsStateData::Reconcile` /// /// # Panics - /// If the current state is invalid + /// If we are not currently in `WaitQuorum` pub(crate) fn begin_reconcile(&mut self) { - info!(self.log, "Transition from {:?} to Reconcile", self.state()); - let DsStateData::Connecting { state, mode } = &mut self.state else { - panic!( - "invalid state {:?} for client {}", - self.state(), - self.client_id - ); - }; + info!( + self.log, + "setting state to reconcile from {:?}", + self.state() + ); // There are two cases where reconciliation is allowed: either from a // new connection, or if all three Downstairs need live-repair - // simultaneously. - match (state.discriminant(), &mode) { - (NegotiationState::WaitQuorum, ConnectionMode::New) => { - // This is fine. - } - (NegotiationState::LiveRepairReady, ConnectionMode::Faulted) => { - // This is also fine, but we need to tweak our connection mode - // because we're no longer doing live-repair. - *mode = ConnectionMode::New; + // simultaneously. If this Downstairs isn't Connecting, then we ignore + // it. + if let DsStateData::Connecting { state, mode, .. } = &mut self.state { + match (state.discriminant(), &mode) { + (NegotiationState::WaitQuorum, ConnectionMode::New) => { + // This is fine. + } + ( + NegotiationState::LiveRepairReady, + ConnectionMode::Faulted, + ) => { + // This is also fine, but we need to tweak our connection mode + // because we're no longer doing live-repair. + *mode = ConnectionMode::New; + } + s => panic!("invalid (state, mode) tuple: ({s:?}"), } - s => panic!("invalid (state, mode) tuple: ({s:?}"), + *state = NegotiationStateData::Reconcile; } - *state = NegotiationStateData::Reconcile; } /// Checks whether this Downstairs is ready for the upstairs to deactivate @@ -511,7 +514,7 @@ impl DownstairsClient { // If the upstairs is already active (or trying to go active), then we // should automatically connect to the Downstairs. let auto_connect = match up_state { - UpstairsState::Active | UpstairsState::GoActive(..) => true, + UpstairsState::Active | UpstairsState::GoActive { .. } => true, UpstairsState::Disabled(..) | UpstairsState::Initializing | UpstairsState::Deactivating { .. } => false, @@ -552,7 +555,7 @@ impl DownstairsClient { match up_state { // If we haven't activated yet (or we're deactivating) then // start from New - UpstairsState::GoActive(..) + UpstairsState::GoActive { .. } | UpstairsState::Initializing | UpstairsState::Disabled(..) | UpstairsState::Deactivating { .. } => ConnectionMode::New, @@ -566,7 +569,7 @@ impl DownstairsClient { match up_state { // If we haven't activated yet (or we're deactivating), then // start from New - UpstairsState::GoActive(..) + UpstairsState::GoActive { .. } | UpstairsState::Initializing | UpstairsState::Disabled(..) | UpstairsState::Deactivating { .. } => ConnectionMode::New, @@ -762,17 +765,20 @@ impl DownstairsClient { /// /// This changes the subsequent path through negotiation, without restarting /// the client IO task. Doing so is safe because the faulted path is - /// a superset of the offline path. + /// a superset of all other paths. /// /// # Panics - /// If we are not in `DsStateData::Connecting { mode: ConnectionMode::Offline, - /// .. }` + /// If we are not in `DsStateData::Connecting { .. }` pub(crate) fn set_connection_mode_faulted(&mut self) { - let DsStateData::Connecting { mode, .. } = &mut self.state else { + let DsStateData::Connecting { mode, state } = &mut self.state else { panic!("not connecting"); }; - assert_eq!(*mode, ConnectionMode::Offline); - *mode = ConnectionMode::Faulted + *mode = ConnectionMode::Faulted; + if let NegotiationStateData::WaitQuorum(r) = state { + // yoink + let r = std::mem::replace(r, RegionMetadata::new(&[], &[], &[])); + *state = NegotiationStateData::LiveRepairReady(r); + } } /// Applies an [`EnqueueResult`] for the given job @@ -1459,7 +1465,7 @@ impl DownstairsClient { // downstairs here. match up_state { UpstairsState::Initializing - | UpstairsState::GoActive(_) => { + | UpstairsState::GoActive { .. } => { warn!( self.log, "Replace {} with {} before active", @@ -1592,7 +1598,7 @@ impl DownstairsClient { if matches!( up_state, UpstairsState::Initializing - | UpstairsState::GoActive(..) + | UpstairsState::GoActive { .. } ) => { *state = NegotiationStateData::WaitQuorum(dsr); diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index a7f1b113e..1d1ec2c83 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -307,16 +307,23 @@ pub(crate) struct ReconcileData { /// Number of extents needing repair during initial activation reconcile_repair_needed: usize, + + /// Flags indicating whether a client is participating + participating: ClientData, } impl ReconcileData { - fn new>>(task_list: V) -> Self { + fn new>>( + task_list: V, + participating: ClientData, + ) -> Self { let task_list = task_list.into(); Self { id: Uuid::new_v4(), current_work: None, reconcile_repair_needed: task_list.len(), task_list, + participating, } } } @@ -440,7 +447,10 @@ impl Downstairs { /// Helper function to set all 3x clients as active, legally #[cfg(test)] pub fn force_active(&mut self) { - let up_state = UpstairsState::GoActive(BlockRes::dummy()); + let up_state = UpstairsState::GoActive { + res: BlockRes::dummy(), + min_quorum_deadline: None, + }; for cid in ClientId::iter() { for state in [ NegotiationStateData::Start, @@ -888,14 +898,15 @@ impl Downstairs { pub(crate) fn collate(&mut self) -> Result { let r = self.check_region_metadata()?; Ok(self.start_reconciliation(r, |data| { - let DsStateData::Connecting { + if let DsStateData::Connecting { state: NegotiationStateData::WaitQuorum(r), .. } = data - else { - panic!("client is not in WaitQuorum"); - }; - r + { + Some(r) + } else { + None + } })) } @@ -913,8 +924,8 @@ impl Downstairs { * that code yet, we are making use of this loop to find our * max. */ - let mut max_flush = 0; - let mut max_gen = 0; + let mut max_flush = None; + let mut max_gen = None; for (cid, rec) in ClientId::iter().filter_map(|i| { if let DsStateData::Connecting { state: NegotiationStateData::WaitQuorum(r), @@ -933,8 +944,8 @@ impl Downstairs { let mut dirty_log = Vec::with_capacity(MAX_LOG); for (i, m) in rec.iter().enumerate() { - max_flush = max_flush.max(m.flush + 1); - max_gen = max_gen.max(m.generation + 1); + max_flush = Some(max_flush.unwrap_or(0).max(m.flush + 1)); + max_gen = Some(max_gen.unwrap_or(0).max(m.generation + 1)); if i < MAX_LOG { flush_log.push(m.flush); gen_log.push(m.generation); @@ -951,6 +962,8 @@ impl Downstairs { info!(self.log, "[{cid}]R dirty{slice}: {dirty_log:?}",); } + let max_gen = max_gen.expect("no clients in WaitQuorum?"); + let max_flush = max_flush.expect("no clients in WaitQuorum?"); info!(self.log, "Max found gen is {}", max_gen); /* * Verify that the generation number that the guest has requested @@ -1014,13 +1027,13 @@ impl Downstairs { else { panic!("client is not in LiveRepairReady"); }; - r + Some(r) }) } /// Begins reconciliation, using the given collation data #[must_use] - fn start_reconciliation &RegionMetadata>( + fn start_reconciliation Option<&RegionMetadata>>( &mut self, data: CollateData, getter: G, @@ -1037,9 +1050,13 @@ impl Downstairs { * Determine what extents don't match and what to do * about that */ - if let Some(reconcile_list) = self.mismatch_list(getter) { - for c in self.clients.iter_mut() { - c.begin_reconcile(); + if let Some((reconcile_list, participating)) = + self.mismatch_list(getter) + { + for i in ClientId::iter() { + if participating[i] { + self.clients[i].begin_reconcile(); + } } let task_list = self.convert_rc_to_messages( @@ -1048,7 +1065,7 @@ impl Downstairs { max_gen, ); - let reconcile = ReconcileData::new(task_list); + let reconcile = ReconcileData::new(task_list, participating); info!( self.log, @@ -1884,8 +1901,12 @@ impl Downstairs { reconcile.reconcile_repair_needed, ); - for c in self.clients.iter_mut() { - c.send_next_reconciliation_req(&mut next); + for i in ClientId::iter() { + if reconcile.participating[i] { + self.clients[i].send_next_reconciliation_req(&mut next); + } else { + next.skip(i); + } } reconcile.current_work = Some(next); @@ -1932,19 +1953,24 @@ impl Downstairs { return false; }; + let Some(reconcile) = self.reconcile.as_mut() else { + unreachable!(); // checked above + }; + // Check to make sure that we're still in a repair-ready state // // If any client have dropped out of repair-readiness (e.g. due to // failed reconciliation, timeouts, etc), then we have to kick // everything else back to the beginning. - if self.clients.iter().any(|c| { - !matches!( - c.state(), - DsState::Connecting { - state: NegotiationState::Reconcile, - .. - } - ) + if ClientId::iter().any(|i| { + reconcile.participating[i] + && !matches!( + self.clients[i].state(), + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } + ) }) { // Something has changed, so abort this repair. // Mark any downstairs that have not changed as failed and disable @@ -1953,10 +1979,6 @@ impl Downstairs { return false; } - let Some(reconcile) = self.reconcile.as_mut() else { - unreachable!(); // checked above - }; - let next = reconcile.current_work.as_mut().unwrap(); if self.clients[client_id].on_reconciliation_job_done(repair_id, next) { reconcile.current_work = None; @@ -2051,13 +2073,37 @@ impl Downstairs { /// Asserts that initial reconciliation is done, and sets clients as Active /// + /// Specifically, clients that were in `NegotiationState::Reconcile` are set + /// as active; clients that are in other negotiation states are marked as + /// faulted. + /// /// # Panics /// If that isn't the case! pub(crate) fn on_reconciliation_done(&mut self, did_work: bool) { assert!(self.ds_active.is_empty()); - for c in self.clients.iter_mut() { - c.set_active(); + for (i, c) in self.clients.iter_mut().enumerate() { + match c.state() { + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } => { + assert!(!did_work); + c.set_active(); + } + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } => { + assert!(did_work); + c.set_active(); + } + DsState::Connecting { .. } => c.set_connection_mode_faulted(), + s => panic!( + "invalid state in on_reconciliation_done \ + for client {i}: {s:?}" + ), + } } if did_work { @@ -2072,16 +2118,20 @@ impl Downstairs { } /// Compares region metadata from all three clients and builds a mend list - fn mismatch_list &RegionMetadata>( + fn mismatch_list Option<&RegionMetadata>>( &self, getter: G, - ) -> Option { + ) -> Option<(DownstairsMend, ClientData)> { let log = self.log.new(o!("" => "mend".to_string())); let mut meta = ClientMap::new(); + let mut participating = ClientData::new(false); for i in ClientId::iter() { - meta.insert(i, getter(self.clients[i].state_data())); + if let Some(r) = getter(self.clients[i].state_data()) { + meta.insert(i, r); + participating[i] = true; + } } - DownstairsMend::new(&meta, log) + DownstairsMend::new(&meta, log).map(|m| (m, participating)) } pub(crate) fn submit_flush( @@ -4065,8 +4115,8 @@ struct DownstairsBackpressureConfig { #[cfg(test)] pub(crate) mod test { use super::{ - ClientFaultReason, ClientNegotiationFailed, ClientStopReason, - ConnectionMode, Downstairs, DsState, LiveRepairStart, + ClientData, ClientFaultReason, ClientNegotiationFailed, + ClientStopReason, ConnectionMode, Downstairs, DsState, NegotiationStateData, PendingJob, }; use crate::{ @@ -4074,7 +4124,9 @@ pub(crate) mod test { ExtentFix, ExtentRepairIDs, IOState, IOop, ImpactedBlocks, JobId, RawReadResponse, ReconcileIO, ReconcileIOState, ReconciliationId, RegionMetadata, SnapshotDetails, - downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, + downstairs::{ + LiveRepairData, LiveRepairStart, LiveRepairState, ReconcileData, + }, live_repair::ExtentInfo, upstairs::UpstairsState, }; @@ -4165,7 +4217,10 @@ pub(crate) mod test { fn set_all_reconcile(ds: &mut Downstairs) { let mode = ConnectionMode::New; - let up_state = UpstairsState::GoActive(BlockRes::dummy()); + let up_state = UpstairsState::GoActive { + res: BlockRes::dummy(), + min_quorum_deadline: None, + }; for cid in ClientId::iter() { for state in [ NegotiationStateData::Start, @@ -5803,7 +5858,7 @@ pub(crate) mod test { // No repairs on the queue, should return None let mut ds = Downstairs::test_default(); - ds.reconcile = Some(ReconcileData::new([])); + ds.reconcile = Some(ReconcileData::new([], ClientData::new(true))); let w = ds.send_next_reconciliation_req(); assert!(w); // reconciliation is "done", because there's nothing there @@ -5821,22 +5876,25 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Put two jobs on the todo list - ds.reconcile = Some(ReconcileData::new([ - ReconcileIO::new( - close_id, - Message::ExtentClose { - repair_id: close_id, - extent_id: ExtentId(1), - }, - ), - ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - ), - ])); + ds.reconcile = Some(ReconcileData::new( + [ + ReconcileIO::new( + close_id, + Message::ExtentClose { + repair_id: close_id, + extent_id: ExtentId(1), + }, + ), + ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + ), + ], + ClientData::new(true), // all clients are participating + )); // Send the first reconciliation req assert!(!ds.send_next_reconciliation_req()); @@ -5877,13 +5935,16 @@ pub(crate) mod test { let up_state = UpstairsState::Active; let rep_id = ReconciliationId(0); - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + )], + ClientData::new(true), // all clients are participating + )); // Send that job ds.send_next_reconciliation_req(); @@ -5916,13 +5977,16 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); let rep_id = ReconciliationId(0); - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + )], + ClientData::new(true), // all clients are participating + )); // Send that req assert!(!ds.send_next_reconciliation_req()); @@ -5938,22 +6002,25 @@ pub(crate) mod test { let close_id = ReconciliationId(0); let rep_id = ReconciliationId(1); - ds.reconcile = Some(ReconcileData::new([ - ReconcileIO::new( - close_id, - Message::ExtentClose { - repair_id: close_id, - extent_id: ExtentId(1), - }, - ), - ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - ), - ])); + ds.reconcile = Some(ReconcileData::new( + [ + ReconcileIO::new( + close_id, + Message::ExtentClose { + repair_id: close_id, + extent_id: ExtentId(1), + }, + ), + ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + ), + ], + ClientData::new(true), // all clients are participating + )); // Send the close job. Reconciliation isn't done at this point! assert!(!ds.send_next_reconciliation_req()); @@ -5986,19 +6053,22 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Queue up a repair message, which will be skiped for client 0 - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentRepair { - repair_id: rep_id, - extent_id: ExtentId(1), - source_client_id: ClientId::new(0), - source_repair_address: SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 803, - ), - dest_clients: vec![ClientId::new(1), ClientId::new(2)], - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentRepair { + repair_id: rep_id, + extent_id: ExtentId(1), + source_client_id: ClientId::new(0), + source_repair_address: SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 803, + ), + dest_clients: vec![ClientId::new(1), ClientId::new(2)], + }, + )], + ClientData::new(true), // all clients are participating + )); // Send the job. Reconciliation isn't done at this point! assert!(!ds.send_next_reconciliation_req()); @@ -6029,19 +6099,22 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Queue up a repair message, which will be skiped for client 0 - ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( - rep_id, - Message::ExtentRepair { - repair_id: rep_id, - extent_id: ExtentId(1), - source_client_id: ClientId::new(0), - source_repair_address: SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 803, - ), - dest_clients: vec![ClientId::new(1), ClientId::new(2)], - }, - )])); + ds.reconcile = Some(ReconcileData::new( + [ReconcileIO::new( + rep_id, + Message::ExtentRepair { + repair_id: rep_id, + extent_id: ExtentId(1), + source_client_id: ClientId::new(0), + source_repair_address: SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 803, + ), + dest_clients: vec![ClientId::new(1), ClientId::new(2)], + }, + )], + ClientData::new(true), // all clients are participating + )); // Send the job. Reconciliation isn't done at this point! assert!(!ds.send_next_reconciliation_req()); @@ -6064,22 +6137,25 @@ pub(crate) mod test { let rep_id = ReconciliationId(1); // Queue up a repair message, which will be skiped for client 0 - ds.reconcile = Some(ReconcileData::new([ - ReconcileIO::new( - close_id, - Message::ExtentClose { - repair_id: close_id, - extent_id: ExtentId(1), - }, - ), - ReconcileIO::new( - rep_id, - Message::ExtentClose { - repair_id: rep_id, - extent_id: ExtentId(1), - }, - ), - ])); + ds.reconcile = Some(ReconcileData::new( + [ + ReconcileIO::new( + close_id, + Message::ExtentClose { + repair_id: close_id, + extent_id: ExtentId(1), + }, + ), + ReconcileIO::new( + rep_id, + Message::ExtentClose { + repair_id: rep_id, + extent_id: ExtentId(1), + }, + ), + ], + ClientData::new(true), // all clients are participating + )); // Send the first req; reconciliation is not yet done assert!(!ds.send_next_reconciliation_req()); diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index df6363312..2c10da1ce 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1065,6 +1065,11 @@ impl ReconcileIO { state: ClientData::new(ReconcileIOState::New), } } + + /// Marks the job as skipped for the given client + fn skip(&mut self, i: ClientId) { + self.state[i] = ReconcileIOState::Skipped; + } } /* diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 33a956712..8fc5dc00d 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -41,6 +41,9 @@ use uuid::Uuid; /// How often to log stats for DTrace const STAT_INTERVAL: Duration = Duration::from_secs(1); +/// How long to delay before negotiating with 2/3 Downstairs +const NEGOTIATION_DELAY: Duration = Duration::from_millis(500); + /// Minimum IO size (in bytes) before encryption / decryption is done off-thread const MIN_DEFER_SIZE_BYTES: u64 = 8192; @@ -65,7 +68,12 @@ pub(crate) enum UpstairsState { /// The guest has requested that the upstairs go active /// /// We should reply on the provided channel - GoActive(BlockRes), + GoActive { + res: BlockRes, + + /// Time at which to perform min-quorum negotiation + min_quorum_deadline: Option, + }, /// The upstairs is fully online and accepting guest IO Active, @@ -270,6 +278,9 @@ pub(crate) enum UpstairsAction { StatUpdate, Control(ControlRequest), + /// The timer has fired for reconciliation, with 2/3 downstairs + MinQuorumReconciliation, + /// The guest connection has been dropped GuestDropped, @@ -512,6 +523,16 @@ impl Upstairs { } } } + _ = if let UpstairsState::GoActive { + min_quorum_deadline: Some(time), .. + } = &self.state { + futures::future::Either::Left(sleep_until(*time)) + } else { + futures::future::Either::Right(std::future::pending()) + } + => { + UpstairsAction::MinQuorumReconciliation + } m = self.deferred_msgs.next(), if !self.deferred_msgs.is_empty() => { // The outer Option is None if the queue is empty. If this is @@ -556,6 +577,9 @@ impl Upstairs { UpstairsAction::GuestDropped => { self.guest_dropped = true; } + UpstairsAction::MinQuorumReconciliation => { + self.on_min_quorum(); + } UpstairsAction::DeferredBlockOp(req) => { self.counters.action_deferred_block += 1; cdt::up__action_deferred_block!(|| self @@ -859,7 +883,7 @@ impl Upstairs { let state = match &self.state { UpstairsState::Initializing | UpstairsState::Disabled(..) - | UpstairsState::GoActive(..) => { + | UpstairsState::GoActive { .. } => { crate::UpState::Initializing } UpstairsState::Active => crate::UpState::Active, @@ -1021,7 +1045,7 @@ impl Upstairs { // We allow this if we are not active yet, or we are active // with the requested generation number. match &self.state { - UpstairsState::Active | UpstairsState::GoActive(..) => { + UpstairsState::Active | UpstairsState::GoActive { .. } => { if self.cfg.generation() == generation { // Okay, we want to activate with what we already // have, that's valid; let the set_active_request @@ -1249,7 +1273,10 @@ impl Upstairs { fn set_active_request(&mut self, res: BlockRes) { match &self.state { UpstairsState::Initializing | UpstairsState::Disabled(..) => { - self.state = UpstairsState::GoActive(res); + self.state = UpstairsState::GoActive { + res, + min_quorum_deadline: None, + }; info!(self.log, "{} active request set", self.cfg.upstairs_id); // Notify all clients that they should go active when they hit @@ -1258,7 +1285,7 @@ impl Upstairs { c.set_active_request(); } } - UpstairsState::GoActive(..) => { + UpstairsState::GoActive { .. } => { // We have already been sent a request to go active, but we // are not active yet and will respond (on the original // BlockRes) when we do become active. @@ -1300,7 +1327,7 @@ impl Upstairs { match &self.state { UpstairsState::Initializing | UpstairsState::Disabled(..) - | UpstairsState::GoActive(..) => { + | UpstairsState::GoActive { .. } => { res.send_err(CrucibleError::UpstairsInactive); return; } @@ -1775,7 +1802,7 @@ impl Upstairs { self.connect_ro_region_set(); } else { // See if we have a quorum - self.connect_region_set(); + self.on_wait_quorum(); } } Ok(NegotiationResult::Replay) => { @@ -1878,7 +1905,7 @@ impl Upstairs { self.on_reconciliation_skipped() } - /// Checks whether we can connect all three regions + /// Response to a client entering `NegotiationState::WaitQuorum` /// /// Returns `false` if we aren't ready, or if things failed. If there's a /// failure, then we also update the client state. @@ -1888,100 +1915,117 @@ impl Upstairs { /// **can't** activate, then we should notify the requestor of failure. /// /// If we have a problem here, we can't activate the upstairs. - fn connect_region_set(&mut self) -> bool { - /* - * If reconciliation is required, it happens in three phases. - * Typically an interruption of reconciliation will result in things - * starting over, but if actual repair work to an extent is - * completed, that extent won't need to be repaired again. - * - * The three phases are: - * - * Collect: - * When a Downstairs connects, the Upstairs collects the gen/flush/dirty - * (GFD) info from all extents. This GFD information is stored and the - * Upstairs waits for all three Downstairs to attach. - * - * Compare: - * In the compare phase, the upstairs will walk the list of all extents - * and compare the G/F/D from each of the downstairs. When there is a - * mismatch between downstairs (The dirty bit counts as a mismatch and - * will force a repair even if generation and flush numbers agree). For - * each mismatch, the upstairs determines which downstairs has the - * extent that should be the source, and which of the other downstairs - * extents needs repair. This list of mismatches (source, - * destination(s)) is collected. Once an upstairs has compiled its - * repair list, it will then generates a sequence of Upstairs -> - * Downstairs repair commands to repair each extent that needs to be - * fixed. For a given piece of repair work, the commands are: - * - Send a flush to source extent. - * - Close extent on all downstairs. - * - Send repair command to destination extents (with source extent - * IP/Port). - * (See DS-DS Repair) - * - Reopen all extents. - * - * Repair: - * During repair Each command issued from the upstairs must be completed - * before the next will be sent. The Upstairs is responsible for walking - * the repair commands and sending them to the required downstairs, and - * waiting for them to finish. The actual repair work for an extent - * takes place on the downstairs being repaired. - * - * Repair (ds to ds) - * Each downstairs runs a repair server (Dropshot) that listens for - * repair requests from other downstairs. A downstairs with an extent - * that needs repair will contact the source downstairs and request the - * list of files for an extent, then request each file. Once all files - * are local to the downstairs needing repair, it will replace the - * existing extent files with the new ones. - */ - let collate_status = { - /* - * Reconciliation only happens during initialization. - * Look at all three downstairs region information collected. - * Determine the highest flush number and make sure our generation - * is high enough. - */ - if !matches!(&self.state, UpstairsState::GoActive(..)) { - info!( - self.log, - "could not connect region set due to bad state: {:?}", - self.state - ); - return false; + /// + /// # Notes on reconciliation + /// If reconciliation is required, it happens in three phases. Failure + /// during reconciliation will result in the upstairs being disabled (and + /// replying to the activation request with an error); if actual repair work + /// to an extent has been completed before the failure, that extent won't + /// need to be repaired again. + /// + /// The three phases are as follows: + /// + /// ## Region metadata collection: + /// When a Downstairs connects, the Upstairs collects the gen/flush/dirty + /// (GFD) info from all extents. This GFD information is stored in + /// `NegotiationState::WaitQuorum`, and the Upstairs waits for more + /// Downstairs to attach. + /// + /// ## Comparing metadata + /// In the compare phase, the upstairs will walk the list of all extents and + /// compare the G/F/D from each of the downstairs. Mismatches are detected + /// if the gen or flush values differ, or if the dirty bit is set (which + /// forces a repair even if generation and flush numbers agree). + /// + /// For each mismatch, the upstairs determines which downstairs has the + /// extent that should be the source, and which of the other downstairs + /// extents needs repair. This list of mismatches (source, destination(s)) + /// is collected. + /// + /// Once an upstairs has compiled its repair list, it will then generates a + /// sequence of Upstairs -> Downstairs repair commands to repair each extent + /// that needs to be fixed. For a given piece of repair work, the commands + /// are: + /// + /// - Send a flush to source extent. + /// - Close extent on all downstairs. + /// - Send repair command to destination extents (with source extent + /// IP / port). + /// - Reopen all extents. + /// + /// ## Repair (upstairs) + /// During repair, each command issued from the upstairs must be completed + /// before the next will be sent. The Upstairs is responsible for walking + /// the repair commands and sending them to the required downstairs, and + /// waiting for them to finish. The actual repair work for an extent takes + /// place on the downstairs being repaired. + /// + /// ## Repair (downstairs-to-downstairs) + /// Each downstairs runs a repair server (Dropshot) that listens for + /// repair requests from other downstairs. A downstairs with an extent + /// that needs repair will contact the source downstairs and request the + /// list of files for an extent, then request each file. Once all files + /// are local to the downstairs needing repair, it will replace the + /// existing extent files with the new ones. + fn on_wait_quorum(&mut self) { + // Reconciliation only happens during initialization. + let UpstairsState::GoActive { + min_quorum_deadline, + .. + } = &mut self.state + else { + // XXX should this panic instead? + info!( + self.log, + "could not connect region set due to bad state: {:?}", + self.state + ); + return; + }; + + let is_ready = self.downstairs.clients.map_ref(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + ) + }); + let ready_count = is_ready.iter().filter(|c| **c).count(); + + match ready_count { + 0 => panic!("called on_wait_quorum with no WaitQuorum downstairs"), + 1 => return, // nothing to do yet + 2 => { + // Print a warning if `min_quorum` is already present. This + // would be a little weird, because we're just now entering the + // min-quorum state, but could be possible if a Downstairs has + // disconnected (i.e. we've gone from 2 -> 1 -> 2 downstairs in + // WaitQuorum). + if min_quorum_deadline.is_some() { + warn!( + self.log, + "entered min-quorum state with \ + `min_quorum` already present" + ) + } + *min_quorum_deadline = Some(Instant::now() + NEGOTIATION_DELAY); + return; // nothing to do yet } - /* - * Make sure all downstairs are in the correct state before we - * proceed. - */ - let ready = self.downstairs.clients.iter().all(|c| { - matches!( - c.state(), - DsState::Connecting { - state: NegotiationState::WaitQuorum, - .. - } - ) - }); - if !ready { - info!(self.log, "Waiting for more clients to be ready"); - return false; + 3 => { + if min_quorum_deadline.is_some() { + info!(self.log, "cancelling min-quorum reconciliation"); + *min_quorum_deadline = None; + } } - - /* - * We figure out if there is any reconciliation to do, and if so, we - * build the list of operations that will repair the extents that - * are not in sync. - * - * If we fail to collate, then we need to kick out all the - * downstairs out, forget any activation requests, and the - * upstairs goes back to waiting for another activation request. - */ - self.downstairs.collate() + _ => unreachable!(), }; - match collate_status { + // At this point, we know that all 3x Downstairs are ready: + assert_eq!(ready_count, 3); + + match self.downstairs.collate() { Err(e) => { error!(self.log, "Failed downstairs collate with: {}", e); // We failed to collate the three downstairs, so we need @@ -1990,20 +2034,90 @@ impl Upstairs { // clients. self.set_disabled(e.into()); self.downstairs.abort_reconciliation(&self.state); - false } Ok(true) => { // We have populated all of the reconciliation requests in // `Downstairs::reconcile_task_list`. Start reconciliation by // sending the first request. self.downstairs.send_next_reconciliation_req(); - true } Ok(false) => { info!(self.log, "No downstairs reconciliation required"); self.on_reconciliation_done(false); info!(self.log, "Set Active after no reconciliation"); - true + } + } + } + + /// Start min-quorum reconciliation + fn on_min_quorum(&mut self) { + // Reconciliation only happens during initialization. + let UpstairsState::GoActive { + min_quorum_deadline, + .. + } = &mut self.state + else { + warn!( + self.log, + "min-quorum negotiation found upstairs state {:?}; cancelling", + self.state + ); + return; + }; + assert!(min_quorum_deadline.is_some()); + *min_quorum_deadline = None; + + let is_ready = self.downstairs.clients.map_ref(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + ) + }); + let ready_count = is_ready.iter().filter(|c| **c).count(); + if ready_count != 2 { + warn!( + self.log, + "min-quorum negotiation found {ready_count} \ + ready downstairs; cancelling" + ); + return; + } + info!(self.log, "Starting min-quorum negotiation"); + + match self.downstairs.collate() { + Err(e) => { + error!(self.log, "Failed downstairs collate with: {e}"); + // We failed to collate the two downstairs, so we need to reset + // that activation request. Call `abort_reconciliation` to + // abort reconciliation for all clients. + self.set_disabled(e.into()); + self.downstairs.abort_reconciliation(&self.state); + } + Ok(true) => { + // We have populated all of the reconciliation requests in + // `Downstairs::reconcile_task_list`. Start reconciliation by + // sending the first request. + self.downstairs.send_next_reconciliation_req(); + + // Move the unready Downstairs into the "connect through + // live-repair path" for negotiation. + let unready_ds = + ClientId::iter().find(|i| !is_ready[*i]).unwrap(); + + info!( + self.log, + "Requiring {unready_ds} to connect through live-repair" + ); + self.downstairs.clients[unready_ds] + .set_connection_mode_faulted(); + } + Ok(false) => { + info!(self.log, "No downstairs reconciliation required"); + self.on_reconciliation_done(false); + info!(self.log, "Set Active after no reconciliation"); } } } @@ -2021,14 +2135,14 @@ impl Upstairs { info!(self.log, "Reconciliation skipped"); match &self.state { - UpstairsState::GoActive(..) => { + UpstairsState::GoActive { .. } => { // If we are not active yet (this is the first downstairs) then // go ahead and set ourselves active. info!(self.log, "Set Downstairs and Upstairs active"); self.downstairs.on_reconciliation_skipped(true); // Swap out the state for UpstairsState::Active - let UpstairsState::GoActive(res) = + let UpstairsState::GoActive { res, .. } = std::mem::replace(&mut self.state, UpstairsState::Active) else { unreachable!(); // We just matched! @@ -2071,7 +2185,7 @@ impl Upstairs { "Set Downstairs and Upstairs active after reconciliation" ); - if !matches!(self.state, UpstairsState::GoActive(..)) { + if !matches!(self.state, UpstairsState::GoActive { .. }) { error!( self.log, "reconciliation done, but upstairs is no longer GoActive: {:?}", @@ -2081,7 +2195,7 @@ impl Upstairs { } // Swap out the state for UpstairsState::Active - let UpstairsState::GoActive(res) = + let UpstairsState::GoActive { res, .. } = std::mem::replace(&mut self.state, UpstairsState::Active) else { unreachable!(); // checked above @@ -2175,7 +2289,7 @@ impl Upstairs { Ok(()) } UpstairsState::Active => Ok(()), - UpstairsState::GoActive(..) => { + UpstairsState::GoActive { .. } => { Err(CrucibleError::UpstairsActivateInProgress) } UpstairsState::Deactivating(..) => { @@ -2195,7 +2309,7 @@ impl Upstairs { &mut self.state, UpstairsState::Disabled(err.clone()), ); - if let UpstairsState::GoActive(res) = prev { + if let UpstairsState::GoActive { res, .. } = prev { res.send_err(err); } for c in ClientId::iter() { @@ -2346,7 +2460,7 @@ pub(crate) mod test { #[test] fn reconcile_not_ready() { - // Verify reconcile returns false when a downstairs is not ready + // Verify reconcile doesn't start when a downstairs is not ready let mut up = Upstairs::test_default(None, false); for cid in [ClientId::new(0), ClientId::new(1)] { for state in [ @@ -2366,10 +2480,32 @@ pub(crate) mod test { } } let (_rx, done) = BlockOpWaiter::pair(); - up.state = UpstairsState::GoActive(done); + up.state = UpstairsState::GoActive { + res: done, + min_quorum_deadline: None, + }; - let res = up.connect_region_set(); - assert!(!res); + up.on_wait_quorum(); + + // DS0 and DS1 are still in WaitQuorum + for cid in [ClientId::new(0), ClientId::new(1)] { + assert!(matches!( + up.ds_state(cid), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + )); + } + // DS2 is still in WaitConnect + assert!(matches!( + up.ds_state(ClientId::new(2)), + DsState::Connecting { + state: NegotiationState::WaitConnect, + .. + } + )); + // The activation hasn't happened assert!(!matches!(&up.state, &UpstairsState::Active)) } @@ -2395,7 +2531,10 @@ pub(crate) mod test { ); } let (_rx, done) = BlockOpWaiter::pair(); - up.state = UpstairsState::GoActive(done); + up.state = UpstairsState::GoActive { + res: done, + min_quorum_deadline: None, + }; up.connect_ro_region_set(); assert!(matches!(&up.state, &UpstairsState::Active)); @@ -2425,7 +2564,10 @@ pub(crate) mod test { } } let (_rx, done) = BlockOpWaiter::pair(); - up.state = UpstairsState::GoActive(done); + up.state = UpstairsState::GoActive { + res: done, + min_quorum_deadline: None, + }; up.connect_ro_region_set(); assert!(matches!(&up.state, &UpstairsState::Active));