Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion src/bin/dolos/common.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use dolos_core::config::{ChainConfig, GenesisConfig, LoggingConfig, RootConfig, TelemetryConfig};
use dolos_core::BootstrapExt;
use futures_util::{stream::FuturesUnordered, StreamExt};
use miette::{Context as _, IntoDiagnostic};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig as _;
use std::sync::Arc;
use std::{fs, path::PathBuf, time::Duration};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use tracing::{debug, error, info, warn};
use tracing_subscriber::{filter::Targets, prelude::*};

use dolos::adapters::DomainAdapter;
Expand Down Expand Up @@ -297,6 +299,27 @@ pub async fn run_pipeline(pipeline: gasket::daemon::Daemon, exit: CancellationTo
pipeline.teardown();
}

pub async fn monitor_drivers(
mut drivers: FuturesUnordered<JoinHandle<Result<(), ServeError>>>,
exit: CancellationToken,
) {
while let Some(result) = drivers.next().await {
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!("driver error: {}", e);
warn!("cancelling remaining drivers");
exit.cancel();
}
Err(e) => {
error!("driver task failed: {}", e);
warn!("cancelling remaining drivers");
exit.cancel();
}
}
}
}

pub fn cleanup_data(config: &RootConfig) -> Result<(), std::io::Error> {
let root = &config.storage.path;

Expand All @@ -314,3 +337,39 @@ pub fn cleanup_data(config: &RootConfig) -> Result<(), std::io::Error> {
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use futures_util::stream::FuturesUnordered;
use std::io;
use tokio::task::JoinHandle;

#[tokio::test]
async fn monitor_drivers_observes_later_failures_before_waiting_for_earlier_drivers() {
let exit = CancellationToken::new();
let drivers: FuturesUnordered<JoinHandle<Result<(), ServeError>>> = FuturesUnordered::new();

let exit_for_slow_driver = exit.clone();
drivers.push(tokio::spawn(async move {
exit_for_slow_driver.cancelled().await;
Ok(())
}));

drivers.push(tokio::spawn(async {
Err(ServeError::BindError(io::Error::new(
io::ErrorKind::AddrInUse,
"socket already exists",
)))
}));

tokio::time::timeout(
Duration::from_secs(1),
monitor_drivers(drivers, exit.clone()),
)
.await
.expect("driver monitor should observe the bind failure promptly");

assert!(exit.is_cancelled());
}
}
11 changes: 2 additions & 9 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use dolos_core::config::RootConfig;
use futures_util::stream::FuturesUnordered;
use miette::{Context, IntoDiagnostic};
use tracing::{error, warn};
use tracing::warn;

#[derive(Debug, clap::Args)]
pub struct Args {}
Expand Down Expand Up @@ -47,14 +47,7 @@ pub async fn run(config: RootConfig, _args: &Args) -> miette::Result<()> {
exit.clone(),
);

for result in drivers {
if let Err(e) = result.await.unwrap() {
error!("driver error: {}", e);

warn!("cancelling remaining drivers");
exit.cancel();
}
}
crate::common::monitor_drivers(drivers, exit.clone()).await;

sync.await.unwrap();

Expand Down
10 changes: 1 addition & 9 deletions src/bin/dolos/serve.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use dolos_core::config::RootConfig;
use futures_util::stream::FuturesUnordered;
use log::warn;
use tracing::error;

#[derive(Debug, clap::Args)]
pub struct Args {}
Expand All @@ -25,14 +24,7 @@ pub async fn run(config: RootConfig, _args: &Args) -> miette::Result<()> {
exit.clone(),
);

for result in drivers {
if let Err(e) = result.await.unwrap() {
error!("driver error: {}", e);

warn!("cancelling remaining drivers");
exit.cancel();
}
}
crate::common::monitor_drivers(drivers, exit.clone()).await;

warn!("shutdown complete");

Expand Down
Loading