From 89a107f6de0b8dc9f200f6a43be80020b228b0aa Mon Sep 17 00:00:00 2001 From: Edward Houston Date: Wed, 22 Apr 2026 10:05:59 +0200 Subject: [PATCH 1/2] test(discovery): add failing tests for panic-hardening Three tests are intentionally failing against the current code: - remove_unhealthy_service_missing_service_does_not_panic (assert! panic) - poisoned_healthy_lock_does_not_panic_on_get_servers (unwrap on poisoned lock) - poisoned_queue_lock_does_not_panic_on_run_health_check (unwrap on poisoned lock) Also marks the pre-existing live-network test with #[ignore] so it does not run in CI. --- src/electrum/discovery.rs | 120 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/src/electrum/discovery.rs b/src/electrum/discovery.rs index 28471c52b..d5b739981 100644 --- a/src/electrum/discovery.rs +++ b/src/electrum/discovery.rs @@ -528,7 +528,127 @@ mod tests { const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4); + fn test_features() -> ServerFeatures { + ServerFeatures { + hosts: serde_json::from_str("{}").unwrap(), + server_version: "test 1.0".into(), + genesis_hash: genesis_hash(Network::Testnet), + protocol_min: PROTOCOL_VERSION, + protocol_max: PROTOCOL_VERSION, + hash_function: "sha256".into(), + pruning: None, + } + } + + // Construct a DiscoveryManager with no queue entries and no DNS lookups. + fn test_manager() -> DiscoveryManager { + DiscoveryManager { + our_addrs: HashSet::new(), + our_version: PROTOCOL_VERSION, + our_features: test_features(), + announce: false, + tor_proxy: None, + healthy: Default::default(), + queue: Default::default(), + } + } + + fn make_job(ip: &str, service: Service) -> HealthCheck { + HealthCheck { + addr: ServerAddr::Clearnet(ip.parse().unwrap()), + hostname: "peer.example".into(), + service, + is_default: false, + added_by: None, + last_check: None, + last_healthy: None, + consecutive_failures: 0, + } + } + + fn insert_healthy(manager: &DiscoveryManager, ip: &str, services: Vec) { + let addr = ServerAddr::Clearnet(ip.parse().unwrap()); + let mut healthy = manager.healthy.write().unwrap(); + let server = healthy + .entry(addr) + .or_insert_with(|| Server::new("peer.example".into(), test_features())); + for svc in services { + server.services.insert(svc); + } + } + + // The old assert!(server.services.remove(&job.service)) panicked when the service + // was not present in the healthy set. Verify it no longer panics. + #[test] + fn remove_unhealthy_service_missing_service_does_not_panic() { + let manager = test_manager(); + insert_healthy(&manager, "1.2.3.4", vec![Service::Tcp(50001)]); + let job = make_job("1.2.3.4", Service::Ssl(50002)); + manager.remove_unhealthy_service(&job); + let healthy = manager.healthy.read().unwrap(); + let addr = ServerAddr::Clearnet("1.2.3.4".parse().unwrap()); + assert!(healthy[&addr].services.contains(&Service::Tcp(50001))); + } + + // The else branch was a FIXME "unreachable but it was reached" in production. + // Verify it does not panic. + #[test] + fn remove_unhealthy_service_missing_addr_does_not_panic() { + let manager = test_manager(); + let job = make_job("1.2.3.4", Service::Tcp(50001)); + manager.remove_unhealthy_service(&job); + } + + #[test] + fn remove_unhealthy_service_removes_server_when_last_service_gone() { + let manager = test_manager(); + insert_healthy(&manager, "1.2.3.4", vec![Service::Tcp(50001)]); + let job = make_job("1.2.3.4", Service::Tcp(50001)); + manager.remove_unhealthy_service(&job); + let healthy = manager.healthy.read().unwrap(); + assert!(!healthy.contains_key(&ServerAddr::Clearnet("1.2.3.4".parse().unwrap()))); + } + + #[test] + fn remove_unhealthy_service_keeps_server_when_other_services_remain() { + let manager = test_manager(); + insert_healthy(&manager, "1.2.3.4", vec![Service::Tcp(50001), Service::Ssl(50002)]); + let job = make_job("1.2.3.4", Service::Tcp(50001)); + manager.remove_unhealthy_service(&job); + let healthy = manager.healthy.read().unwrap(); + let addr = ServerAddr::Clearnet("1.2.3.4".parse().unwrap()); + assert!(healthy.contains_key(&addr)); + assert!(healthy[&addr].services.contains(&Service::Ssl(50002))); + assert!(!healthy[&addr].services.contains(&Service::Tcp(50001))); + } + + #[test] + fn poisoned_healthy_lock_does_not_panic_on_get_servers() { + let manager = Arc::new(test_manager()); + let m = Arc::clone(&manager); + let _ = std::thread::spawn(move || { + let _guard = m.healthy.write().unwrap(); + panic!("intentional lock poison"); + }) + .join(); + let servers = manager.get_servers(); + assert!(servers.is_empty()); + } + + #[test] + fn poisoned_queue_lock_does_not_panic_on_run_health_check() { + let manager = Arc::new(test_manager()); + let m = Arc::clone(&manager); + let _ = std::thread::spawn(move || { + let _guard = m.queue.write().unwrap(); + panic!("intentional lock poison"); + }) + .join(); + assert!(manager.run_health_check().is_ok()); + } + #[test] + #[ignore = "makes live network connections to testnet Electrum servers"] fn test() -> Result<()> { stderrlog::new().verbosity(4).init().unwrap(); From 685527a080dc3179eeb8d2d33bedb608834e8f02 Mon Sep 17 00:00:00 2001 From: Edward Houston Date: Thu, 23 Apr 2026 07:52:43 +0200 Subject: [PATCH 2/2] fix(discovery): harden discovery-jobs thread against panics Replace assert! with warn! in remove_unhealthy_service, recover poisoned RwLocks via into_inner() throughout, and wrap the jobs loop body in catch_unwind so a panic cannot kill the discovery thread. Log the panic payload on catch so errors are actionable in production. Add comments documenting the two-layer defense, the lock-poison recovery tradeoff, and the single-consumer TOCTOU assumption. --- src/electrum/discovery.rs | 69 +++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/src/electrum/discovery.rs b/src/electrum/discovery.rs index d5b739981..224d227d4 100644 --- a/src/electrum/discovery.rs +++ b/src/electrum/discovery.rs @@ -124,7 +124,7 @@ impl DiscoveryManager { pub fn add_server_request(&self, added_by: IpAddr, features: ServerFeatures) -> Result<()> { self.verify_compatibility(&features)?; - let mut queue = self.queue.write().unwrap(); + let mut queue = self.queue.write().unwrap_or_else(|e| e.into_inner()); ensure!(queue.len() < MAX_QUEUE_SIZE, "queue size exceeded"); // TODO optimize @@ -205,7 +205,7 @@ impl DiscoveryManager { /// before being removed due to unavailability. pub fn add_default_server(&self, hostname: Hostname, services: Vec) -> Result<()> { let addr = ServerAddr::resolve(&hostname)?; - let mut queue = self.queue.write().unwrap(); + let mut queue = self.queue.write().unwrap_or_else(|e| e.into_inner()); queue.extend( services .into_iter() @@ -219,7 +219,7 @@ impl DiscoveryManager { // XXX return a random sample instead of everything? self.healthy .read() - .unwrap() + .unwrap_or_else(|e| e.into_inner()) .iter() .map(|(addr, server)| { ServerEntry(addr.clone(), server.hostname.clone(), server.feature_strs()) @@ -234,14 +234,19 @@ impl DiscoveryManager { /// Run the next health check in the queue (a single one) fn run_health_check(&self) -> Result<()> { // abort if there are no entries in the queue, or its still too early for the next one up - if self.queue.read().unwrap().peek().map_or(true, |next| { + if self.queue.read().unwrap_or_else(|e| e.into_inner()).peek().map_or(true, |next| { next.last_check .map_or(false, |t| t.elapsed() < HEALTH_CHECK_FREQ) }) { return Ok(()); } - let mut job = self.queue.write().unwrap().pop().unwrap(); + // Only spawn_jobs_thread calls pop(), so this is the sole consumer; the + // let-else guards against the (currently unreachable) race where the queue + // drains between the peek above and this pop. + let Some(mut job) = self.queue.write().unwrap_or_else(|e| e.into_inner()).pop() else { + return Ok(()); + }; debug!("processing {:?}", job); let was_healthy = job.is_healthy(); @@ -259,7 +264,7 @@ impl DiscoveryManager { job.last_healthy = job.last_check; job.consecutive_failures = 0; // schedule the next health check - self.queue.write().unwrap().push(job); + self.queue.write().unwrap_or_else(|e| e.into_inner()).push(job); Ok(()) } @@ -275,7 +280,7 @@ impl DiscoveryManager { job.consecutive_failures += 1; if job.should_retry() { - self.queue.write().unwrap().push(job); + self.queue.write().unwrap_or_else(|e| e.into_inner()).push(job); } else { debug!("giving up on {:?}", job); } @@ -288,7 +293,11 @@ impl DiscoveryManager { /// Upsert the server/service into the healthy set fn save_healthy_service(&self, job: &HealthCheck, features: ServerFeatures) { let addr = job.addr.clone(); - let mut healthy = self.healthy.write().unwrap(); + debug!( + "saving healthy service hostname='{}' addr='{}' service={:?}", + job.hostname, addr, job.service + ); + let mut healthy = self.healthy.write().unwrap_or_else(|e| e.into_inner()); healthy .entry(addr) .or_insert_with(|| Server::new(job.hostname.clone(), features)) @@ -299,16 +308,29 @@ impl DiscoveryManager { /// Remove the service, and remove the server entirely if it has no other reamining healthy services fn remove_unhealthy_service(&self, job: &HealthCheck) { let addr = job.addr.clone(); - let mut healthy = self.healthy.write().unwrap(); + debug!( + "removing unhealthy service hostname='{}' addr='{}' service={:?}", + job.hostname, addr, job.service + ); + let mut healthy = self.healthy.write().unwrap_or_else(|e| e.into_inner()); if let Entry::Occupied(mut entry) = healthy.entry(addr) { let server = entry.get_mut(); - assert!(server.services.remove(&job.service)); + if !server.services.remove(&job.service) { + warn!( + "service={:?} hostname='{}' missing from healthy set, corrupted state", + job.service, + job.hostname + ); + } if server.services.is_empty() { entry.remove_entry(); } } else { - // FIXME This was an unreachable but it was reached. - log::warn!("missing expected server, corrupted state"); + warn!( + "hostname='{}' addr='{}' missing from healthy map, corrupted state", + job.hostname, + job.addr + ); } } @@ -376,9 +398,28 @@ impl DiscoveryManager { } pub fn spawn_jobs_thread(manager: Arc) { + // Two-layer panic hardening: + // 1. catch_unwind prevents a panic in any single iteration from killing this thread. + // 2. All RwLock accesses use unwrap_or_else(|e| e.into_inner()) so that if a panic + // fires while a lock is held (poisoning it), the next iteration recovers the inner + // value and continues rather than dying on the poisoned lock. The recovered state + // may be partially-modified, but for best-effort peer discovery that is acceptable. + // These two defenses are co-dependent: catch_unwind prevents thread death; + // unwrap_or_else prevents the *next* iteration from dying due to the poisoned lock + // left behind by the previous panic. spawn_thread("discovery-jobs", move || loop { - if let Err(e) = manager.run_health_check() { - debug!("health check failed: {:?}", e); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + if let Err(e) = manager.run_health_check() { + debug!("health check failed: {:?}", e); + } + })); + if let Err(panic) = result { + let msg = panic + .downcast_ref::<&str>() + .copied() + .or_else(|| panic.downcast_ref::().map(String::as_str)) + .unwrap_or("unknown payload"); + error!("discovery-jobs panicked reason='{}', continuing after delay", msg); } // XXX use a dynamic JOB_INTERVAL, adjusted according to the queue size and HEALTH_CHECK_FREQ? thread::sleep(JOB_INTERVAL);