diff --git a/CHANGELOG.md b/CHANGELOG.md index e05b32089e4..13aefa3fdb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ ### Breaking +- [#7252](https://github.com/ChainSafe/forest/pull/7252): `forest-cli snapshot export` now generates checksum file at {filepath}.sha256sum. The default file extension is changed from `.forest.car.sha256sum` to `.forest.car.zst.sha256sum`. + ### Added - [#7269](https://github.com/ChainSafe/forest/pull/7269): Added `--wait-confidence` and `--wait-timeout` to `forest-wallet send` command. diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index 5b66f0ac71c..cb8cbe1c725 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -26,7 +26,6 @@ fi echo "Exporting zstd compressed snapshot in $format format" $FOREST_CLI_PATH snapshot export --format "$format" > snapshot_export.log 2>&1 & - echo "Testing that export is in progress" for ((i=1; i<=retries; i++)); do output=$($FOREST_CLI_PATH snapshot export-status --format json) @@ -62,7 +61,33 @@ echo "Exporting zstd compressed snapshot at genesis" $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" echo "Exporting zstd compressed snapshot in $format format" -$FOREST_CLI_PATH snapshot export --format "$format" +$FOREST_CLI_PATH snapshot export --format "$format" & +EXPORT_CMD_PID=$! +sleep 5 +# another export job should be disallowed +export_error=$($FOREST_CLI_PATH snapshot export 2>&1 || true) +if echo "$export_error" | grep -q "active chain export job has started"; then + echo "verified another export job is disallowed" +else + echo "another export job should be disallowed" + echo "output was: $export_error" + exit 1 +fi +# another export-diff job should be disallowed +export_diff_error=$($FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 || true) +if echo "$export_diff_error" | grep -q "active chain export job has started"; then + echo "verified another export-diff job is disallowed" +else + echo "another export-diff job should be disallowed" + echo "output was: $export_diff_error" + exit 1 +fi +# Killing the CLI should not cancel the export +echo "killing cli command" +kill -KILL $EXPORT_CMD_PID +# Wait on the same export job +echo "waiting on export-status" +$FOREST_CLI_PATH snapshot export-status --wait $FOREST_CLI_PATH shutdown --force diff --git a/scripts/tests/calibnet_export_diff_check.sh b/scripts/tests/calibnet_export_diff_check.sh index add8dfc8fc1..b1f1b41f934 100755 --- a/scripts/tests/calibnet_export_diff_check.sh +++ b/scripts/tests/calibnet_export_diff_check.sh @@ -9,12 +9,73 @@ source "$(dirname "$0")/harness.sh" forest_init "$@" +retries=30 +sleep_interval=0.5 + db_path=$($FOREST_TOOL_PATH db stats --chain calibnet | grep "Database path:" | cut -d':' -f2- | xargs) snapshot=$(find "$db_path/car_db"/*.car.zst | tail -n 1) snapshot_epoch=$(forest_query_epoch "$snapshot") echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" -$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 +$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & + +echo "Testing that export is in progress" +for ((i=1; i<=retries; i++)); do + output=$($FOREST_CLI_PATH snapshot export-status --format json) + is_exporting=$(echo "$output" | jq -r '.exporting') + if [ "$is_exporting" == "true" ]; then + break + fi + if [ $i -eq $retries ]; then + echo "export should be in progress" + exit 1 + fi + sleep $sleep_interval +done + +$FOREST_CLI_PATH snapshot export-cancel + +echo "Testing that export has been cancelled" +for ((i=1; i<=retries; i++)); do + output=$($FOREST_CLI_PATH snapshot export-status --format json) + is_exporting=$(echo "$output" | jq -r '.exporting') + is_cancelled=$(echo "$output" | jq -r '.cancelled') + if [ "$is_exporting" == "false" ] && [ "$is_cancelled" == "true" ]; then + break + fi + if [ $i -eq $retries ]; then + echo "export should be cancelled" + exit 1 + fi + sleep $sleep_interval +done + +echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" +$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & +EXPORT_CMD_PID=$! +sleep 5 +# another export job should be disallowed +export_error=$($FOREST_CLI_PATH snapshot export 2>&1 || true) +if echo "$export_error" | grep -q "active chain export job has started"; then + echo "verified another export job is disallowed" +else + echo "another export job should be disallowed" + echo "output was: $export_error" + exit 1 +fi +# another export-diff job should be disallowed +export_diff_error=$($FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 || true) +if echo "$export_diff_error" | grep -q "active chain export job has started"; then + echo "verified another export-diff job is disallowed" +else + echo "another export-diff job should be disallowed" + echo "output was: $export_diff_error" + exit 1 +fi +# Killing the CLI should not cancel the export +kill -KILL $EXPORT_CMD_PID +# Wait on the same export job +$FOREST_CLI_PATH snapshot export-status --wait $FOREST_CLI_PATH shutdown --force diff --git a/src/cli/subcommands/snapshot_cmd.rs b/src/cli/subcommands/snapshot_cmd.rs index 10880d73cc6..79550012082 100644 --- a/src/cli/subcommands/snapshot_cmd.rs +++ b/src/cli/subcommands/snapshot_cmd.rs @@ -4,21 +4,18 @@ use crate::chain::FilecoinSnapshotVersion; use crate::chain_sync::chain_muxer::DEFAULT_RECENT_STATE_ROOTS; use crate::cli_shared::snapshot::{self, TrustedVendor}; -use crate::db::car::forest::new_forest_car_temp_path_in; +use crate::db::car::forest::tmp_exporting_forest_car_path; use crate::networks::calibnet; +use crate::prelude::*; use crate::rpc::chain::ForestChainExportDiffParams; use crate::rpc::types::ApiExportResult; use crate::rpc::{self, chain::ForestChainExportParams, prelude::*}; use crate::shim::policy::policy_constants::CHAIN_FINALITY; -use anyhow::Context as _; use chrono::DateTime; use clap::Subcommand; use indicatif::{ProgressBar, ProgressStyle}; -use std::{ - path::{Path, PathBuf}, - time::Duration, -}; -use tokio::io::AsyncWriteExt; +use std::{path::PathBuf, time::Duration}; +use tokio_util::sync::CancellationToken; #[derive(Debug, Clone, clap::ValueEnum)] pub enum Format { @@ -120,7 +117,7 @@ impl SnapshotCommands { ChainHead::call(&client, ()).await? }; - let output_path = match output_path.is_dir() { + let output_path = std::path::absolute(match output_path.is_dir() { true => output_path.join(snapshot::filename( TrustedVendor::Forest, chain_name, @@ -132,16 +129,14 @@ impl SnapshotCommands { true, )), false => output_path.clone(), - }; - - let output_dir = output_path.parent().context("invalid output path")?; - let temp_path = new_forest_car_temp_path_in(output_dir)?; + }) + .context("failed to make output path absolute")?; let params = ForestChainExportParams { version: format, epoch: tipset.epoch(), recent_roots: depth, - output_path: temp_path.to_path_buf(), + output_path: output_path.clone(), tipset_keys: tipset.key().clone().into(), include_receipts: false, include_events: false, @@ -158,7 +153,7 @@ impl SnapshotCommands { ).with_message(format!("Exporting v{} snapshot to {} ...", format as u64, output_path.display())); pb.enable_steady_tick(std::time::Duration::from_millis(80)); let handle = tokio::spawn({ - let path: PathBuf = (&temp_path).into(); + let path = tmp_exporting_forest_car_path(&output_path); let pb = pb.clone(); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); async move { @@ -181,21 +176,8 @@ impl SnapshotCommands { pb.finish(); _ = handle.await; - if !dry_run { - match export_result.clone() { - ApiExportResult::Done(hash_opt) => { - // Move the file first; prevents orphaned checksum on persist error. - temp_path.persist(&output_path)?; - if let Some(hash) = hash_opt { - save_checksum(&output_path, hash).await?; - } - } - ApiExportResult::Cancelled => { /* no file to persist on cancel */ } - } - } - match export_result { - ApiExportResult::Done(_) => { + ApiExportResult::Done => { println!("Export completed."); } ApiExportResult::Cancelled => { @@ -245,13 +227,12 @@ impl SnapshotCommands { if result.cancelled { pb.set_message("Export cancelled"); pb.abandon(); - return Ok(()); } let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64; pb.set_position(position); - if position >= 10000 { + if !result.exporting { break; } tokio::time::sleep(Duration::from_millis(500)).await; @@ -303,18 +284,16 @@ impl SnapshotCommands { let depth = depth.unwrap_or_else(|| from - to); anyhow::ensure!(depth > 0, "depth must be positive"); - let output_path = match output_path.is_dir() { + let output_path = std::path::absolute(match output_path.is_dir() { true => output_path.join(format!( "forest_snapshot_diff_{chain_name}_{from}_{to}+{depth}.car.zst" )), false => output_path.clone(), - }; - - let output_dir = output_path.parent().context("invalid output path")?; - let temp_path = new_forest_car_temp_path_in(output_dir)?; + }) + .context("failed to make output path absolute")?; let params = ForestChainExportDiffParams { - output_path: temp_path.to_path_buf(), + output_path: output_path.clone(), from, to, depth, @@ -327,12 +306,16 @@ impl SnapshotCommands { .expect("indicatif template must be valid"), ).with_message(format!("Exporting {} ...", output_path.display())); pb.enable_steady_tick(std::time::Duration::from_millis(80)); + let cancellation_token = CancellationToken::new(); + // Make sure token is cancelled on error path + let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref(); let handle = tokio::spawn({ - let path: PathBuf = (&temp_path).into(); + let cancellation_token = cancellation_token.clone(); + let path = tmp_exporting_forest_car_path(&output_path); let pb = pb.clone(); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); async move { - loop { + while !cancellation_token.is_cancelled() { interval.tick().await; if let Ok(meta) = std::fs::metadata(&path) { pb.set_position(meta.len()); @@ -340,42 +323,26 @@ impl SnapshotCommands { } } }); - // Manually construct RpcRequest because snapshot export could // take a few hours on mainnet - client + let export_result = client .call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX)) .await?; - - handle.abort(); + // cancel before `handle.await` to avoid deadlock + cancellation_token.cancel(); pb.finish(); _ = handle.await; - temp_path.persist(output_path)?; - println!("Export completed."); + match export_result { + ApiExportResult::Done => { + println!("Export completed."); + } + ApiExportResult::Cancelled => { + println!("Export cancelled."); + } + } Ok(()) } } } } - -/// Prints hex-encoded representation of SHA-256 checksum and saves it to a file -/// with the same name but with a `.sha256sum` extension. -async fn save_checksum(source: &Path, encoded_hash: String) -> anyhow::Result<()> { - let checksum_file_content = format!( - "{encoded_hash} {}\n", - source - .file_name() - .and_then(std::ffi::OsStr::to_str) - .context("Failed to retrieve file name while saving checksum")? - ); - - let checksum_path = PathBuf::from(source).with_extension("sha256sum"); - - let mut checksum_file = tokio::fs::File::create(&checksum_path).await?; - checksum_file - .write_all(checksum_file_content.as_bytes()) - .await?; - checksum_file.flush().await?; - Ok(()) -} diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 651044f2596..1e14db9c147 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -63,7 +63,7 @@ use integer_encoding::VarIntReader; use nunny::Vec as NonEmpty; use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor}; use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::OnceLock; use std::task::Poll; use std::time::Duration; @@ -508,11 +508,24 @@ pub fn new_forest_car_temp_path_in( .into_temp_path()) } +pub fn tmp_exporting_forest_car_path(output_path: &Path) -> PathBuf { + let mut p = output_path.to_owned(); + p.add_extension("tmp"); + p +} + +pub fn forest_car_sha256sum_path(output_path: &Path) -> PathBuf { + let mut p = output_path.to_owned(); + p.add_extension("sha256sum"); + p +} + #[cfg(test)] mod tests { use super::*; use nunny::vec as nonempty; use quickcheck_macros::quickcheck; + use rstest::rstest; use tokio_test::block_on; fn mk_encoded_car( @@ -630,4 +643,33 @@ mod tests { assert_eq!(forest_car.get(&cid_a).unwrap().unwrap(), blocks[0].data); assert_eq!(forest_car.get(&cid_b).unwrap().unwrap(), blocks[1].data); } + + #[rstest] + #[case( + Path::new("/tmp/a.forst.car.zst"), + Path::new("/tmp/a.forst.car.zst.tmp") + )] + #[case(Path::new("tmp/a.forst.car.zst"), Path::new("tmp/a.forst.car.zst.tmp"))] + #[case(Path::new("a.forst.car.zst"), Path::new("a.forst.car.zst.tmp"))] + #[case(Path::new(""), Path::new(""))] + #[case(Path::new("."), Path::new("."))] + fn test_tmp_exporting_forest_car_path(#[case] input: &Path, #[case] output: &Path) { + assert_eq!(tmp_exporting_forest_car_path(input), output); + } + + #[rstest] + #[case( + Path::new("/tmp/a.forst.car.zst"), + Path::new("/tmp/a.forst.car.zst.sha256sum") + )] + #[case( + Path::new("tmp/a.forst.car.zst"), + Path::new("tmp/a.forst.car.zst.sha256sum") + )] + #[case(Path::new("a.forst.car.zst"), Path::new("a.forst.car.zst.sha256sum"))] + #[case(Path::new(""), Path::new(""))] + #[case(Path::new("."), Path::new("."))] + fn test_forest_car_sha256sum_path(#[case] input: &Path, #[case] output: &Path) { + assert_eq!(forest_car_sha256sum_path(input), output); + } } diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 736944ddf96..68b4592525a 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -86,6 +86,8 @@ impl ChainExportGuard { impl Drop for ChainExportGuard { fn drop(&mut self) { + // In case some tasks are waiting on this token + self.cancellation_token.cancel(); end_export() } } diff --git a/src/lib.rs b/src/lib.rs index 5bfb5eb06a3..ee0929f09c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod prelude { pub use ahash::{HashMapExt as _, HashSetExt as _}; pub use anyhow::Context as _; pub use cid::Cid; + pub use futures::FutureExt as _; pub use itertools::Itertools as _; pub use std::{ops::Deref as _, sync::Arc}; } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 7b849238bd8..07a3b8ad7ff 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -11,6 +11,7 @@ use crate::chain::index::{ChainIndex, ResolveNullTipset}; use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange}; use crate::chain_sync::{get_full_tipset, load_full_tipset}; use crate::cid_collections::{CidHashSet, FileBackedCidHashSet}; +use crate::db::car::forest::{forest_car_sha256sum_path, tmp_exporting_forest_car_path}; use crate::ipld::DfsIter; use crate::ipld::{CHAIN_EXPORT_STATUS, ChainExportGuard}; use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; @@ -42,7 +43,11 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::convert::Infallible; use std::fs::File; -use std::{collections::VecDeque, path::PathBuf, sync::LazyLock}; +use std::{ + collections::VecDeque, + path::{Path, PathBuf}, + sync::LazyLock, +}; use tokio::sync::broadcast::{self, Receiver as Subscriber}; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -280,113 +285,127 @@ impl RpcMethod<1> for ForestChainExport { (params,): Self::Params, _: &http::Extensions, ) -> Result { - let ForestChainExportParams { - version, - epoch, - recent_roots, - output_path, - tipset_keys: ApiTipsetKey(tsk), - include_receipts, - include_events, - include_tipset_keys, - skip_checksum, - dry_run, - } = params; + fn save_checksum( + checksum: digest::Output, + snapshot_output_path: &Path, + ) -> anyhow::Result<()> { + let path = forest_car_sha256sum_path(snapshot_output_path); + std::fs::write( + path, + format!( + "{} {}\n", + checksum.encode_hex::(), + snapshot_output_path + .file_name() + .and_then(std::ffi::OsStr::to_str) + .context("Failed to retrieve file name while saving checksum")? + ), + )?; + Ok(()) + } - let chain_export_guard = ChainExportGuard::try_start_export()?; + // Spawn a task so it's not cancelled when CLI client is disconnected. + // So do not wrap this with `AbortOnDropHandle` + let handle = tokio::spawn(async move { + let ForestChainExportParams { + version, + epoch, + recent_roots, + output_path, + tipset_keys: ApiTipsetKey(tsk), + include_receipts, + include_events, + include_tipset_keys, + skip_checksum, + dry_run, + } = params; - let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; - let start_ts = ctx.chain_index().load_required_tipset_by_height( - epoch, - head, - ResolveNullTipset::TakeOlder, - )?; + let chain_export_guard = ChainExportGuard::try_start_export()?; - let options = ExportOptions { - skip_checksum, - include_receipts, - include_events, - include_tipset_keys, - seen: FileBackedCidHashSet::new(ctx.temp_dir.as_path())?, - }; - let writer = if dry_run { - tokio_util::either::Either::Left(VoidAsyncWriter) - } else { - tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?) - }; - let result = match version { - FilecoinSnapshotVersion::V1 => { - let db = ctx.db_owned(); + let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; + let start_ts = ctx.chain_index().load_required_tipset_by_height( + epoch, + head, + ResolveNullTipset::TakeOlder, + )?; - let chain_export = crate::chain::export::( - &db, + let options = ExportOptions { + skip_checksum, + include_receipts, + include_events, + include_tipset_keys, + seen: FileBackedCidHashSet::new(ctx.temp_dir.as_path())?, + }; + let tmp_path = + tempfile::TempPath::try_from_path(tmp_exporting_forest_car_path(&output_path))?; + let writer = if dry_run { + tokio_util::either::Either::Left(VoidAsyncWriter) + } else { + tokio_util::either::Either::Right(tokio::fs::File::create(&tmp_path).await?) + }; + let chain_export = match version { + FilecoinSnapshotVersion::V1 => crate::chain::export::( + ctx.db(), &start_ts, recent_roots, writer, options, - ); - - tokio::select! { - result = chain_export => { - result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) - }, - _ = chain_export_guard.cancellation_token().cancelled() => { - chain_export_guard.cancel_export(); - tracing::warn!("Snapshot export was cancelled"); - Ok(ApiExportResult::Cancelled) - }, + ) + .boxed(), + FilecoinSnapshotVersion::V2 => { + let f3_snap_tmp_path = { + let mut f3_snap_dir = output_path.clone(); + let mut builder = tempfile::Builder::new(); + let with_suffix = builder.suffix(".f3snap.bin"); + if f3_snap_dir.pop() { + with_suffix.tempfile_in(&f3_snap_dir) + } else { + with_suffix.tempfile_in(".") + }? + .into_temp_path() + }; + let f3_snap = { + match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()) + .await + { + Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)), + Err(e) => { + tracing::error!("Failed to export F3 snapshot: {e:#}"); + None + } + } + }; + crate::chain::export_v2::( + ctx.db(), + f3_snap, + &start_ts, + recent_roots, + writer, + options, + ) + .boxed() } - } - FilecoinSnapshotVersion::V2 => { - let db = ctx.db_owned(); - - let f3_snap_tmp_path = { - let mut f3_snap_dir = output_path.clone(); - let mut builder = tempfile::Builder::new(); - let with_suffix = builder.suffix(".f3snap.bin"); - if f3_snap_dir.pop() { - with_suffix.tempfile_in(&f3_snap_dir) - } else { - with_suffix.tempfile_in(".") - }? - .into_temp_path() - }; - let f3_snap = { - match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await + }; + tokio::select! { + result = chain_export => { + let checksum_opt = result?; + if !dry_run { - Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)), - Err(e) => { - tracing::error!("Failed to export F3 snapshot: {e:#}"); - None + tmp_path.persist(&output_path)?; + if let Some(checksum) = checksum_opt { + save_checksum(checksum, &output_path)?; } } - }; - - let chain_export = crate::chain::export_v2::( - &db, - f3_snap, - &start_ts, - recent_roots, - writer, - options, - ); - - tokio::select! { - result = chain_export => { - result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) - }, - _ = chain_export_guard.cancellation_token().cancelled() => { - chain_export_guard.cancel_export(); - tracing::warn!("Snapshot export was cancelled"); - Ok(ApiExportResult::Cancelled) - }, - } + anyhow::Ok(ApiExportResult::Done) + }, + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); + tracing::warn!("Snapshot export was cancelled"); + anyhow::Ok(ApiExportResult::Cancelled) + }, } - }; - match result { - Ok(export_result) => Ok(export_result), - Err(e) => Err(anyhow::anyhow!(e).into()), - } + }); + Ok(handle.await??) } } @@ -474,49 +493,64 @@ impl RpcMethod<1> for ForestChainExportDiff { "Exports a differential snapshot covering the given epoch range to a CAR file."; type Params = (ForestChainExportDiffParams,); - type Ok = (); + type Ok = ApiExportResult; async fn handle( ctx: Ctx, (params,): Self::Params, _: &http::Extensions, ) -> Result { - let _guard = ChainExportGuard::try_start_export()?; - - let ForestChainExportDiffParams { - from, - to, - depth, - output_path, - } = params; + // Spawn a task so it's not cancelled when CLI client is disconnected + // So do not wrap this with `AbortOnDropHandle` + let handle = tokio::spawn(async move { + let chain_export_guard = ChainExportGuard::try_start_export()?; + + let ForestChainExportDiffParams { + from, + to, + depth, + output_path, + } = params; - let chain_finality = ctx.chain_config().policy.chain_finality; - if depth < chain_finality { - return Err( - anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(), + let chain_finality = ctx.chain_config().policy.chain_finality; + anyhow::ensure!( + depth >= chain_finality, + "depth {depth} must be greater than or equal to chain_finality {chain_finality}" ); - } - let head = ctx.chain_store().heaviest_tipset(); - let start_ts = ctx.chain_index().load_required_tipset_by_height( - from, - head, - ResolveNullTipset::TakeOlder, - )?; - - crate::tool::subcommands::archive_cmd::do_export( - ctx.chain_index().db(), - start_ts, - output_path, - None, - depth, - Some(to), - Some(chain_finality), - true, - ) - .await?; + let head = ctx.chain_store().heaviest_tipset(); + let start_ts = ctx.chain_index().load_required_tipset_by_height( + from, + head, + ResolveNullTipset::TakeOlder, + )?; + let tmp_path = + tempfile::TempPath::try_from_path(tmp_exporting_forest_car_path(&output_path))?; + let chain_export = crate::tool::subcommands::archive_cmd::do_export( + ctx.chain_index().db(), + start_ts, + tmp_path.to_path_buf(), + None, + depth, + Some(to), + Some(chain_finality), + true, + ); - Ok(()) + tokio::select! { + result = chain_export => { + result?; + tmp_path.persist(&output_path)?; + anyhow::Ok(ApiExportResult::Done) + }, + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); + tracing::warn!("Diff snapshot export was cancelled"); + anyhow::Ok(ApiExportResult::Cancelled) + }, + } + }); + Ok(handle.await??) } } diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap index 62602db8e7c..3f01e616739 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap @@ -415,7 +415,7 @@ methods: name: Forest.ChainExportDiff.Result required: true schema: - type: "null" + $ref: "#/components/schemas/ApiExportResult" paramStructure: by-position - name: Forest.ChainExportStatus description: Returns the progress and status of the in-progress chain export. @@ -4841,19 +4841,10 @@ components: - r - s ApiExportResult: - oneOf: - - type: string - enum: - - Cancelled - - type: object - properties: - Done: - type: - - string - - "null" - additionalProperties: false - required: - - Done + type: string + enum: + - Done + - Cancelled ApiExportStatus: type: object properties: diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap index 569cf957063..b7357571ef1 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap @@ -410,7 +410,7 @@ methods: name: Forest.ChainExportDiff.Result required: true schema: - type: "null" + $ref: "#/components/schemas/ApiExportResult" paramStructure: by-position - name: Forest.ChainExportStatus description: Returns the progress and status of the in-progress chain export. @@ -4910,19 +4910,10 @@ components: - r - s ApiExportResult: - oneOf: - - type: string - enum: - - Cancelled - - type: object - properties: - Done: - type: - - string - - "null" - additionalProperties: false - required: - - Done + type: string + enum: + - Done + - Cancelled ApiExportStatus: type: object properties: diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap index c50b6d763fc..a5d4a0d79f7 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap @@ -53,7 +53,7 @@ methods: name: Forest.ChainExportDiff.Result required: true schema: - type: "null" + $ref: "#/components/schemas/ApiExportResult" paramStructure: by-position - name: Forest.ChainExportStatus description: Returns the progress and status of the in-progress chain export. @@ -1725,19 +1725,10 @@ components: - r - s ApiExportResult: - oneOf: - - type: string - enum: - - Cancelled - - type: object - properties: - Done: - type: - - string - - "null" - additionalProperties: false - required: - - Done + type: string + enum: + - Done + - Cancelled ApiExportStatus: type: object properties: diff --git a/src/rpc/types/mod.rs b/src/rpc/types/mod.rs index a2e44ea2d1b..c0f3768b437 100644 --- a/src/rpc/types/mod.rs +++ b/src/rpc/types/mod.rs @@ -579,7 +579,7 @@ lotus_json_with_self!(ApiExportStatus); #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, PartialEq, Eq, Hash)] pub enum ApiExportResult { - Done(Option), + Done, Cancelled, } diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 4b5c78e2b77..5b05847aa09 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -591,7 +591,7 @@ where .context("diff epoch must be smaller than target epoch")?; let diff_ts: &Tipset = &diff_ts; let diff_limit = diff_depth.map(|depth| diff_ts.epoch() - depth).unwrap_or(0); - let store = Arc::new(store.shallow_clone()); + let store = store.shallow_clone(); let mut stream = stream_chain( store.shallow_clone(), diff_ts.clone().chain_owned(store.shallow_clone()),