diff --git a/src/bin/dolos/common.rs b/src/bin/dolos/common.rs index 40f17b0f..d74e0262 100644 --- a/src/bin/dolos/common.rs +++ b/src/bin/dolos/common.rs @@ -1,12 +1,14 @@ use dolos_core::config::{ChainConfig, GenesisConfig, LoggingConfig, RootConfig, TelemetryConfig}; use dolos_core::BootstrapExt; +use futures_util::{stream::FuturesUnordered, StreamExt}; use miette::{Context as _, IntoDiagnostic}; use opentelemetry::trace::TracerProvider as _; use opentelemetry_otlp::WithExportConfig as _; use std::sync::Arc; use std::{fs, path::PathBuf, time::Duration}; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use tracing::{debug, info}; +use tracing::{debug, error, info, warn}; use tracing_subscriber::{filter::Targets, prelude::*}; use dolos::adapters::DomainAdapter; @@ -297,6 +299,27 @@ pub async fn run_pipeline(pipeline: gasket::daemon::Daemon, exit: CancellationTo pipeline.teardown(); } +pub async fn monitor_drivers( + mut drivers: FuturesUnordered>>, + exit: CancellationToken, +) { + while let Some(result) = drivers.next().await { + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => { + error!("driver error: {}", e); + warn!("cancelling remaining drivers"); + exit.cancel(); + } + Err(e) => { + error!("driver task failed: {}", e); + warn!("cancelling remaining drivers"); + exit.cancel(); + } + } + } +} + pub fn cleanup_data(config: &RootConfig) -> Result<(), std::io::Error> { let root = &config.storage.path; @@ -314,3 +337,39 @@ pub fn cleanup_data(config: &RootConfig) -> Result<(), std::io::Error> { } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::stream::FuturesUnordered; + use std::io; + use tokio::task::JoinHandle; + + #[tokio::test] + async fn monitor_drivers_observes_later_failures_before_waiting_for_earlier_drivers() { + let exit = CancellationToken::new(); + let drivers: FuturesUnordered>> = FuturesUnordered::new(); + + let exit_for_slow_driver = exit.clone(); + drivers.push(tokio::spawn(async move { + exit_for_slow_driver.cancelled().await; + Ok(()) + })); + + drivers.push(tokio::spawn(async { + Err(ServeError::BindError(io::Error::new( + io::ErrorKind::AddrInUse, + "socket already exists", + ))) + })); + + tokio::time::timeout( + Duration::from_secs(1), + monitor_drivers(drivers, exit.clone()), + ) + .await + .expect("driver monitor should observe the bind failure promptly"); + + assert!(exit.is_cancelled()); + } +} diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 6680d8fa..aaf13d59 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -1,7 +1,7 @@ use dolos_core::config::RootConfig; use futures_util::stream::FuturesUnordered; use miette::{Context, IntoDiagnostic}; -use tracing::{error, warn}; +use tracing::warn; #[derive(Debug, clap::Args)] pub struct Args {} @@ -47,14 +47,7 @@ pub async fn run(config: RootConfig, _args: &Args) -> miette::Result<()> { exit.clone(), ); - for result in drivers { - if let Err(e) = result.await.unwrap() { - error!("driver error: {}", e); - - warn!("cancelling remaining drivers"); - exit.cancel(); - } - } + crate::common::monitor_drivers(drivers, exit.clone()).await; sync.await.unwrap(); diff --git a/src/bin/dolos/serve.rs b/src/bin/dolos/serve.rs index b21cb8a9..2bb86c55 100644 --- a/src/bin/dolos/serve.rs +++ b/src/bin/dolos/serve.rs @@ -1,7 +1,6 @@ use dolos_core::config::RootConfig; use futures_util::stream::FuturesUnordered; use log::warn; -use tracing::error; #[derive(Debug, clap::Args)] pub struct Args {} @@ -25,14 +24,7 @@ pub async fn run(config: RootConfig, _args: &Args) -> miette::Result<()> { exit.clone(), ); - for result in drivers { - if let Err(e) = result.await.unwrap() { - error!("driver error: {}", e); - - warn!("cancelling remaining drivers"); - exit.cancel(); - } - } + crate::common::monitor_drivers(drivers, exit.clone()).await; warn!("shutdown complete");