From 52594de7c0db829e7e1e12135a3995d104e19096 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 29 Jun 2026 17:04:07 +0800 Subject: [PATCH 01/18] fix: refactor snapshot export locking / cancelling logic --- src/db/gc/snapshot.rs | 5 ++-- src/ipld/util.rs | 63 ++++++++++++++++++++++++++++++++++------ src/rpc/methods/chain.rs | 54 ++++++++++------------------------ 3 files changed, 71 insertions(+), 51 deletions(-) diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index 9c88dc17211..911ea668cfe 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -49,12 +49,11 @@ use crate::db::{ parity_db::GarbageCollectableDb, }; use crate::interpreter::VMTrace; +use crate::ipld::ChainExportGuard; use crate::prelude::*; use crate::shim::clock::EPOCHS_IN_DAY; use crate::utils::io::EitherMmapOrRandomAccessFile; use ahash::HashMap; -use cid::Cid; -use fvm_ipld_blockstore::Blockstore; use human_repr::HumanCount as _; use parking_lot::RwLock; use sha2::Sha256; @@ -200,7 +199,6 @@ impl SnapshotGarbageCollector { tracing::warn!("snap gc has already been running"); return; } - match self.export_snapshot().await { Ok(_) => { if let Err(e) = self.cleanup_after_snapshot_export().await { @@ -219,6 +217,7 @@ impl SnapshotGarbageCollector { } async fn export_snapshot(&self) -> anyhow::Result<()> { + let _guard = ChainExportGuard::try_start_export()?; let db = self.db(); tracing::info!( "exporting lite snapshot with {} recent state roots", diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 24d0755d040..b6395e2adaf 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -4,17 +4,16 @@ use crate::blocks::Tipset; use crate::cid_collections::{CidHashSet, CidHashSetLike}; use crate::ipld::Ipld; +use crate::prelude::*; use crate::shim::clock::ChainEpoch; use crate::shim::executor::Receipt; use crate::utils::db::car_stream::CarBlock; use crate::utils::encoding::extract_cids; use crate::utils::multihash::prelude::*; +use arc_swap::ArcSwapOption; use bytes::Bytes; use chrono::{DateTime, Utc}; -use cid::Cid; use futures::Stream; -use fvm_ipld_blockstore::Blockstore; -use parking_lot::RwLock; use pin_project_lite::pin_project; use std::borrow::Borrow; use std::collections::VecDeque; @@ -22,6 +21,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::{LazyLock, atomic}; use std::task::{Context, Poll}; +use tokio_util::sync::CancellationToken; #[derive(Default)] pub struct ExportStatus { @@ -29,7 +29,8 @@ pub struct ExportStatus { pub initial_epoch: AtomicI64, pub exporting: AtomicBool, pub cancelled: AtomicBool, - pub start_time: RwLock>>, + pub start_time: ArcSwapOption>, + pub cancellation_token: ArcSwapOption, } impl ExportStatus { @@ -50,12 +51,52 @@ impl ExportStatus { } pub fn start_time(&self) -> Option> { - *self.start_time.read() + self.start_time.load().clone().map(Arc::unwrap_or_clone) + } + + pub fn cancellation_token(&self) -> Option { + self.cancellation_token + .load() + .clone() + .map(Arc::unwrap_or_clone) } } pub static CHAIN_EXPORT_STATUS: LazyLock = LazyLock::new(ExportStatus::default); +pub struct ChainExportGuard { + cancellation_token: CancellationToken, +} + +impl ChainExportGuard { + pub fn try_start_export() -> anyhow::Result { + anyhow::ensure!( + !CHAIN_EXPORT_STATUS.exporting(), + "An active chain export job has started at {}, start epoch: {}, current epoch: {}", + CHAIN_EXPORT_STATUS.start_time().unwrap_or_default(), + CHAIN_EXPORT_STATUS.initial_epoch(), + CHAIN_EXPORT_STATUS.epoch(), + ); + let cancellation_token = CancellationToken::new(); + start_export(cancellation_token.clone()); + Ok(Self { cancellation_token }) + } + + pub fn cancel_export(&self) { + cancel_export() + } + + pub fn cancellation_token(&self) -> &CancellationToken { + &self.cancellation_token + } +} + +impl Drop for ChainExportGuard { + fn drop(&mut self) { + end_export() + } +} + fn update_epoch(new_value: i64) { let status = &*CHAIN_EXPORT_STATUS; status.epoch.store(new_value, atomic::Ordering::Relaxed); @@ -67,22 +108,26 @@ fn update_epoch(new_value: i64) { ); } -pub fn start_export() { +fn start_export(cancellation_token: CancellationToken) { let status = &*CHAIN_EXPORT_STATUS; status.epoch.store(0, atomic::Ordering::Relaxed); status.initial_epoch.store(0, atomic::Ordering::Relaxed); status.exporting.store(true, atomic::Ordering::Relaxed); status.cancelled.store(false, atomic::Ordering::Relaxed); - *status.start_time.write() = Some(Utc::now()); + status.start_time.store(Some(Utc::now().into())); + status + .cancellation_token + .store(Some(cancellation_token.into())); } -pub fn end_export() { +fn end_export() { CHAIN_EXPORT_STATUS .exporting .store(false, atomic::Ordering::Relaxed); + CHAIN_EXPORT_STATUS.cancellation_token.store(None); } -pub fn cancel_export() { +fn cancel_export() { let status = &*CHAIN_EXPORT_STATUS; status.exporting.store(false, atomic::Ordering::Relaxed); status.cancelled.store(true, atomic::Ordering::Relaxed); diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index f4ee7c3ac4d..4950195883e 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -12,7 +12,7 @@ use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChang use crate::chain_sync::{get_full_tipset, load_full_tipset}; use crate::cid_collections::{CidHashSet, FileBackedCidHashSet}; use crate::ipld::DfsIter; -use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export}; +use crate::ipld::{CHAIN_EXPORT_STATUS, ChainExportGuard}; use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; @@ -43,11 +43,7 @@ use sha2::Sha256; use std::convert::Infallible; use std::fs::File; use std::{collections::VecDeque, path::PathBuf, sync::LazyLock}; -use tokio::sync::{ - Mutex, - broadcast::{self, Receiver as Subscriber}, -}; -use tokio_util::sync::CancellationToken; +use tokio::sync::broadcast::{self, Receiver as Subscriber}; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -66,9 +62,6 @@ const HEAD_CHANNEL_CAPACITY: usize = 10; /// https://github.com/filecoin-project/go-f3/issues/944 pub const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200; -static CHAIN_EXPORT_LOCK: LazyLock>> = - LazyLock::new(|| Mutex::new(None)); - pub enum ChainGetFinalizedTipset {} impl RpcMethod<0> for ChainGetFinalizedTipset { const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet"; @@ -305,17 +298,7 @@ impl RpcMethod<1> for ForestChainExport { dry_run, } = params; - let token = CancellationToken::new(); - { - let mut guard = CHAIN_EXPORT_LOCK.lock().await; - if guard.is_some() { - return Err( - anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(), - ); - } - *guard = Some(token.clone()); - } - start_export(); + let chain_export_guard = ChainExportGuard::try_start_export()?; let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; let start_ts = ctx.chain_index().load_required_tipset_by_height( @@ -352,8 +335,8 @@ impl RpcMethod<1> for ForestChainExport { result = chain_export => { result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) }, - _ = token.cancelled() => { - cancel_export(); + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); tracing::warn!("Snapshot export was cancelled"); Ok(ApiExportResult::Cancelled) }, @@ -397,18 +380,14 @@ impl RpcMethod<1> for ForestChainExport { result = chain_export => { result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) }, - _ = token.cancelled() => { - cancel_export(); + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); tracing::warn!("Snapshot export was cancelled"); Ok(ApiExportResult::Cancelled) }, } } }; - end_export(); - // Clean up token - let mut guard = CHAIN_EXPORT_LOCK.lock().await; - *guard = None; match result { Ok(export_result) => Ok(export_result), Err(e) => Err(anyhow::anyhow!(e).into()), @@ -479,12 +458,14 @@ impl RpcMethod<0> for ForestChainExportCancel { (): Self::Params, _: &http::Extensions, ) -> Result { - if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() { + if CHAIN_EXPORT_STATUS.exporting() + && let Some(token) = CHAIN_EXPORT_STATUS.cancellation_token() + { token.cancel(); - return Ok(true); + Ok(true) + } else { + Ok(false) } - - Ok(false) } } @@ -505,6 +486,8 @@ impl RpcMethod<1> for ForestChainExportDiff { (params,): Self::Params, _: &http::Extensions, ) -> Result { + let _guard = ChainExportGuard::try_start_export()?; + let ForestChainExportDiffParams { from, to, @@ -512,13 +495,6 @@ impl RpcMethod<1> for ForestChainExportDiff { output_path, } = params; - let _locked = CHAIN_EXPORT_LOCK.try_lock(); - if _locked.is_err() { - return Err( - anyhow::anyhow!("Another chain export diff job is still in progress").into(), - ); - } - let chain_finality = ctx.chain_config().policy.chain_finality; if depth < chain_finality { return Err( From 63adf1c451d8e879df902cc72c818d18c1a6c346 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 29 Jun 2026 17:23:19 +0800 Subject: [PATCH 02/18] fix --- src/ipld/util.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/ipld/util.rs b/src/ipld/util.rs index b6395e2adaf..2c1908bd762 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -70,15 +70,8 @@ pub struct ChainExportGuard { impl ChainExportGuard { pub fn try_start_export() -> anyhow::Result { - anyhow::ensure!( - !CHAIN_EXPORT_STATUS.exporting(), - "An active chain export job has started at {}, start epoch: {}, current epoch: {}", - CHAIN_EXPORT_STATUS.start_time().unwrap_or_default(), - CHAIN_EXPORT_STATUS.initial_epoch(), - CHAIN_EXPORT_STATUS.epoch(), - ); let cancellation_token = CancellationToken::new(); - start_export(cancellation_token.clone()); + start_export(cancellation_token.clone())?; Ok(Self { cancellation_token }) } @@ -108,16 +101,24 @@ fn update_epoch(new_value: i64) { ); } -fn start_export(cancellation_token: CancellationToken) { +fn start_export(cancellation_token: CancellationToken) -> anyhow::Result<()> { let status = &*CHAIN_EXPORT_STATUS; + let export_in_progress = status.exporting.swap(true, atomic::Ordering::Relaxed); + anyhow::ensure!( + !export_in_progress, + "An active chain export job has started at {}, start epoch: {}, current epoch: {}", + status.start_time().unwrap_or_default(), + status.initial_epoch(), + status.epoch(), + ); status.epoch.store(0, atomic::Ordering::Relaxed); status.initial_epoch.store(0, atomic::Ordering::Relaxed); - status.exporting.store(true, atomic::Ordering::Relaxed); status.cancelled.store(false, atomic::Ordering::Relaxed); status.start_time.store(Some(Utc::now().into())); status .cancellation_token .store(Some(cancellation_token.into())); + Ok(()) } fn end_export() { From 29bbd0cdeb6ac2207d0c6131083a9af1be2f72b3 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 29 Jun 2026 22:48:42 +0800 Subject: [PATCH 03/18] fix: yield to runtime on every 100 ChainStream polls --- src/ipld/util.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 24d0755d040..8ee2e02ae00 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -183,6 +183,7 @@ pin_project! { events: bool, tipset_keys:bool, track_progress: bool, + n_polled: usize, } } @@ -254,6 +255,7 @@ pub fn stream_chain< events: false, tipset_keys: false, track_progress: false, + n_polled: 0, } } @@ -278,7 +280,7 @@ impl, ITER: Iterator + Unpin, S: Cid { type Item = anyhow::Result; - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use Task::*; let export_tipset_keys = self.tipset_keys; @@ -286,6 +288,15 @@ impl, ITER: Iterator + Unpin, S: Cid let stateroot_limit_exclusive = self.stateroot_limit_exclusive; let this = self.project(); + // Yield to the runtime every 128 polls to allow cancellation + { + *this.n_polled += 1; + if this.n_polled.is_multiple_of(128) { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + } + loop { while let Some(task) = this.dfs.front_mut() { match task { From 2e1bffd0b45aab78fff28d8ca852fba43a029d7b Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 29 Jun 2026 19:49:02 +0800 Subject: [PATCH 04/18] fix: refactor `forest-cli snapshot` command --- scripts/tests/calibnet_export_check.sh | 22 +- scripts/tests/calibnet_export_diff_check.sh | 21 +- src/cli/subcommands/snapshot_cmd.rs | 92 ++---- src/db/car/forest.rs | 14 +- src/rpc/methods/chain.rs | 293 +++++++++++--------- src/rpc/types/mod.rs | 2 +- src/tool/subcommands/archive_cmd.rs | 2 +- 7 files changed, 244 insertions(+), 202 deletions(-) diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index 5b66f0ac71c..56ddec0cdbc 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -26,7 +26,6 @@ fi echo "Exporting zstd compressed snapshot in $format format" $FOREST_CLI_PATH snapshot export --format "$format" > snapshot_export.log 2>&1 & - echo "Testing that export is in progress" for ((i=1; i<=retries; i++)); do output=$($FOREST_CLI_PATH snapshot export-status --format json) @@ -62,7 +61,26 @@ echo "Exporting zstd compressed snapshot at genesis" $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" echo "Exporting zstd compressed snapshot in $format format" -$FOREST_CLI_PATH snapshot export --format "$format" +$FOREST_CLI_PATH snapshot export --format "$format" & +EXPORT_CMD_PID=$! +# another export job should be disallowed +if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then + : +else + echo "another export job should be disallowed" + exit 1 +fi +# another export-diff job should be disallowed +if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then + : +else + echo "another export-diff job should be disallowed" + exit 1 +fi +# Killing the CLI should not cancel the export +kill -KILL $EXPORT_CMD_PID +# Wait on the same export job +$FOREST_CLI_PATH snapshot export-status --wait $FOREST_CLI_PATH shutdown --force diff --git a/scripts/tests/calibnet_export_diff_check.sh b/scripts/tests/calibnet_export_diff_check.sh index add8dfc8fc1..3d3658c21ab 100755 --- a/scripts/tests/calibnet_export_diff_check.sh +++ b/scripts/tests/calibnet_export_diff_check.sh @@ -14,7 +14,26 @@ snapshot=$(find "$db_path/car_db"/*.car.zst | tail -n 1) snapshot_epoch=$(forest_query_epoch "$snapshot") echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" -$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 +$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & +EXPORT_CMD_PID=$! +# another export job should be disallowed +if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then + : +else + echo "another export job should be disallowed" + exit 1 +fi +# another export-diff job should be disallowed +if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then + : +else + echo "another export-diff job should be disallowed" + exit 1 +fi +# Killing the CLI should not cancel the export +kill -KILL $EXPORT_CMD_PID +# Wait on the same export job +$FOREST_CLI_PATH snapshot export-status --wait $FOREST_CLI_PATH shutdown --force diff --git a/src/cli/subcommands/snapshot_cmd.rs b/src/cli/subcommands/snapshot_cmd.rs index 10880d73cc6..468a14673a9 100644 --- a/src/cli/subcommands/snapshot_cmd.rs +++ b/src/cli/subcommands/snapshot_cmd.rs @@ -4,21 +4,18 @@ use crate::chain::FilecoinSnapshotVersion; use crate::chain_sync::chain_muxer::DEFAULT_RECENT_STATE_ROOTS; use crate::cli_shared::snapshot::{self, TrustedVendor}; -use crate::db::car::forest::new_forest_car_temp_path_in; +use crate::db::car::forest::tmp_exporting_forest_car_path; use crate::networks::calibnet; +use crate::prelude::*; use crate::rpc::chain::ForestChainExportDiffParams; use crate::rpc::types::ApiExportResult; use crate::rpc::{self, chain::ForestChainExportParams, prelude::*}; use crate::shim::policy::policy_constants::CHAIN_FINALITY; -use anyhow::Context as _; use chrono::DateTime; use clap::Subcommand; use indicatif::{ProgressBar, ProgressStyle}; -use std::{ - path::{Path, PathBuf}, - time::Duration, -}; -use tokio::io::AsyncWriteExt; +use std::{path::PathBuf, time::Duration}; +use tokio_util::sync::CancellationToken; #[derive(Debug, Clone, clap::ValueEnum)] pub enum Format { @@ -120,7 +117,7 @@ impl SnapshotCommands { ChainHead::call(&client, ()).await? }; - let output_path = match output_path.is_dir() { + let output_path = std::path::absolute(match output_path.is_dir() { true => output_path.join(snapshot::filename( TrustedVendor::Forest, chain_name, @@ -132,16 +129,14 @@ impl SnapshotCommands { true, )), false => output_path.clone(), - }; - - let output_dir = output_path.parent().context("invalid output path")?; - let temp_path = new_forest_car_temp_path_in(output_dir)?; + }) + .context("failed to make output path absolute")?; let params = ForestChainExportParams { version: format, epoch: tipset.epoch(), recent_roots: depth, - output_path: temp_path.to_path_buf(), + output_path: output_path.clone(), tipset_keys: tipset.key().clone().into(), include_receipts: false, include_events: false, @@ -158,7 +153,7 @@ impl SnapshotCommands { ).with_message(format!("Exporting v{} snapshot to {} ...", format as u64, output_path.display())); pb.enable_steady_tick(std::time::Duration::from_millis(80)); let handle = tokio::spawn({ - let path: PathBuf = (&temp_path).into(); + let path = tmp_exporting_forest_car_path(&output_path); let pb = pb.clone(); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); async move { @@ -181,21 +176,8 @@ impl SnapshotCommands { pb.finish(); _ = handle.await; - if !dry_run { - match export_result.clone() { - ApiExportResult::Done(hash_opt) => { - // Move the file first; prevents orphaned checksum on persist error. - temp_path.persist(&output_path)?; - if let Some(hash) = hash_opt { - save_checksum(&output_path, hash).await?; - } - } - ApiExportResult::Cancelled => { /* no file to persist on cancel */ } - } - } - match export_result { - ApiExportResult::Done(_) => { + ApiExportResult::Done => { println!("Export completed."); } ApiExportResult::Cancelled => { @@ -245,13 +227,12 @@ impl SnapshotCommands { if result.cancelled { pb.set_message("Export cancelled"); pb.abandon(); - return Ok(()); } let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64; pb.set_position(position); - if position >= 10000 { + if !result.exporting { break; } tokio::time::sleep(Duration::from_millis(500)).await; @@ -303,18 +284,16 @@ impl SnapshotCommands { let depth = depth.unwrap_or_else(|| from - to); anyhow::ensure!(depth > 0, "depth must be positive"); - let output_path = match output_path.is_dir() { + let output_path = std::path::absolute(match output_path.is_dir() { true => output_path.join(format!( "forest_snapshot_diff_{chain_name}_{from}_{to}+{depth}.car.zst" )), false => output_path.clone(), - }; - - let output_dir = output_path.parent().context("invalid output path")?; - let temp_path = new_forest_car_temp_path_in(output_dir)?; + }) + .context("failed to make output path absolute")?; let params = ForestChainExportDiffParams { - output_path: temp_path.to_path_buf(), + output_path: output_path.clone(), from, to, depth, @@ -327,12 +306,14 @@ impl SnapshotCommands { .expect("indicatif template must be valid"), ).with_message(format!("Exporting {} ...", output_path.display())); pb.enable_steady_tick(std::time::Duration::from_millis(80)); + let cancellation_token = CancellationToken::new(); let handle = tokio::spawn({ - let path: PathBuf = (&temp_path).into(); + let cancellation_token = cancellation_token.clone(); + let path = tmp_exporting_forest_car_path(&output_path); let pb = pb.clone(); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); async move { - loop { + while !cancellation_token.is_cancelled() { interval.tick().await; if let Ok(meta) = std::fs::metadata(&path) { pb.set_position(meta.len()); @@ -343,39 +324,24 @@ impl SnapshotCommands { // Manually construct RpcRequest because snapshot export could // take a few hours on mainnet - client + let export_result = client .call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX)) .await?; - handle.abort(); + cancellation_token.cancel(); pb.finish(); _ = handle.await; - temp_path.persist(output_path)?; - println!("Export completed."); + match export_result { + ApiExportResult::Done => { + println!("Export completed."); + } + ApiExportResult::Cancelled => { + println!("Export cancelled."); + } + } Ok(()) } } } } - -/// Prints hex-encoded representation of SHA-256 checksum and saves it to a file -/// with the same name but with a `.sha256sum` extension. -async fn save_checksum(source: &Path, encoded_hash: String) -> anyhow::Result<()> { - let checksum_file_content = format!( - "{encoded_hash} {}\n", - source - .file_name() - .and_then(std::ffi::OsStr::to_str) - .context("Failed to retrieve file name while saving checksum")? - ); - - let checksum_path = PathBuf::from(source).with_extension("sha256sum"); - - let mut checksum_file = tokio::fs::File::create(&checksum_path).await?; - checksum_file - .write_all(checksum_file_content.as_bytes()) - .await?; - checksum_file.flush().await?; - Ok(()) -} diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 651044f2596..7d095db6e2b 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -63,7 +63,7 @@ use integer_encoding::VarIntReader; use nunny::Vec as NonEmpty; use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor}; use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::OnceLock; use std::task::Poll; use std::time::Duration; @@ -508,6 +508,18 @@ pub fn new_forest_car_temp_path_in( .into_temp_path()) } +pub fn tmp_exporting_forest_car_path(output_path: &Path) -> PathBuf { + let mut p = output_path.to_owned(); + p.add_extension("tmp"); + p +} + +pub fn forest_car_sha256sum_path(output_path: &Path) -> PathBuf { + let mut p = output_path.to_owned(); + p.add_extension("sha256sum"); + p +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 4950195883e..acfa4087b54 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -11,6 +11,7 @@ use crate::chain::index::{ChainIndex, ResolveNullTipset}; use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange}; use crate::chain_sync::{get_full_tipset, load_full_tipset}; use crate::cid_collections::{CidHashSet, FileBackedCidHashSet}; +use crate::db::car::forest::{forest_car_sha256sum_path, tmp_exporting_forest_car_path}; use crate::ipld::DfsIter; use crate::ipld::{CHAIN_EXPORT_STATUS, ChainExportGuard}; use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; @@ -285,113 +286,125 @@ impl RpcMethod<1> for ForestChainExport { (params,): Self::Params, _: &http::Extensions, ) -> Result { - let ForestChainExportParams { - version, - epoch, - recent_roots, - output_path, - tipset_keys: ApiTipsetKey(tsk), - include_receipts, - include_events, - include_tipset_keys, - skip_checksum, - dry_run, - } = params; - - let chain_export_guard = ChainExportGuard::try_start_export()?; + // Spawn a task so it's not cancelled when CLI client is disconnected + let handle = tokio::spawn(async move { + let ForestChainExportParams { + version, + epoch, + recent_roots, + output_path, + tipset_keys: ApiTipsetKey(tsk), + include_receipts, + include_events, + include_tipset_keys, + skip_checksum, + dry_run, + } = params; - let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; - let start_ts = ctx.chain_index().load_required_tipset_by_height( - epoch, - head, - ResolveNullTipset::TakeOlder, - )?; + let chain_export_guard = ChainExportGuard::try_start_export()?; - let options = ExportOptions { - skip_checksum, - include_receipts, - include_events, - include_tipset_keys, - seen: FileBackedCidHashSet::new(ctx.temp_dir.as_path())?, - }; - let writer = if dry_run { - tokio_util::either::Either::Left(VoidAsyncWriter) - } else { - tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?) - }; - let result = match version { - FilecoinSnapshotVersion::V1 => { - let db = ctx.db_owned(); - - let chain_export = crate::chain::export::( - &db, - &start_ts, - recent_roots, - writer, - options, - ); + let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; + let start_ts = ctx.chain_index().load_required_tipset_by_height( + epoch, + head, + ResolveNullTipset::TakeOlder, + )?; - tokio::select! { - result = chain_export => { - result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) - }, - _ = chain_export_guard.cancellation_token().cancelled() => { - chain_export_guard.cancel_export(); - tracing::warn!("Snapshot export was cancelled"); - Ok(ApiExportResult::Cancelled) - }, + let options = ExportOptions { + skip_checksum, + include_receipts, + include_events, + include_tipset_keys, + seen: FileBackedCidHashSet::new(ctx.temp_dir.as_path())?, + }; + let tmp_path = + tempfile::TempPath::try_from_path(tmp_exporting_forest_car_path(&output_path))?; + let writer = if dry_run { + tokio_util::either::Either::Left(VoidAsyncWriter) + } else { + tokio_util::either::Either::Right(tokio::fs::File::create(&tmp_path).await?) + }; + let result = match version { + FilecoinSnapshotVersion::V1 => { + let db = ctx.db_owned(); + + let chain_export = crate::chain::export::( + &db, + &start_ts, + recent_roots, + writer, + options, + ); + + tokio::select! { + result = chain_export => { + result.map(|_| ApiExportResult::Done) + }, + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); + tracing::warn!("Snapshot export was cancelled"); + Ok(ApiExportResult::Cancelled) + }, + } } - } - FilecoinSnapshotVersion::V2 => { - let db = ctx.db_owned(); - - let f3_snap_tmp_path = { - let mut f3_snap_dir = output_path.clone(); - let mut builder = tempfile::Builder::new(); - let with_suffix = builder.suffix(".f3snap.bin"); - if f3_snap_dir.pop() { - with_suffix.tempfile_in(&f3_snap_dir) - } else { - with_suffix.tempfile_in(".") - }? - .into_temp_path() - }; - let f3_snap = { - match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await - { - Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)), - Err(e) => { - tracing::error!("Failed to export F3 snapshot: {e:#}"); - None + FilecoinSnapshotVersion::V2 => { + let db = ctx.db_owned(); + + let f3_snap_tmp_path = { + let mut f3_snap_dir = output_path.clone(); + let mut builder = tempfile::Builder::new(); + let with_suffix = builder.suffix(".f3snap.bin"); + if f3_snap_dir.pop() { + with_suffix.tempfile_in(&f3_snap_dir) + } else { + with_suffix.tempfile_in(".") + }? + .into_temp_path() + }; + let f3_snap = { + match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()) + .await + { + Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)), + Err(e) => { + tracing::error!("Failed to export F3 snapshot: {e:#}"); + None + } } + }; + + let chain_export = crate::chain::export_v2::( + &db, + f3_snap, + &start_ts, + recent_roots, + writer, + options, + ); + + tokio::select! { + result = chain_export => { + if let Some(checksum) = result? { + let path = forest_car_sha256sum_path(&output_path); + std::fs::write(path, format!("{} {}\n", + checksum.encode_hex::(), + output_path.file_name().and_then(std::ffi::OsStr::to_str) + .context("Failed to retrieve file name while saving checksum")?))?; + } + tmp_path.persist(&output_path)?; + Ok(ApiExportResult::Done) + }, + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); + tracing::warn!("Snapshot export was cancelled"); + Ok(ApiExportResult::Cancelled) + }, } - }; - - let chain_export = crate::chain::export_v2::( - &db, - f3_snap, - &start_ts, - recent_roots, - writer, - options, - ); - - tokio::select! { - result = chain_export => { - result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) - }, - _ = chain_export_guard.cancellation_token().cancelled() => { - chain_export_guard.cancel_export(); - tracing::warn!("Snapshot export was cancelled"); - Ok(ApiExportResult::Cancelled) - }, } - } - }; - match result { - Ok(export_result) => Ok(export_result), - Err(e) => Err(anyhow::anyhow!(e).into()), - } + }; + anyhow::Ok(result?) + }); + Ok(handle.await??) } } @@ -479,49 +492,63 @@ impl RpcMethod<1> for ForestChainExportDiff { Some("Exports a differential snapshot covering the given epoch range to a CAR file."); type Params = (ForestChainExportDiffParams,); - type Ok = (); + type Ok = ApiExportResult; async fn handle( ctx: Ctx, (params,): Self::Params, _: &http::Extensions, ) -> Result { - let _guard = ChainExportGuard::try_start_export()?; - - let ForestChainExportDiffParams { - from, - to, - depth, - output_path, - } = params; + // Spawn a task so it's not cancelled when CLI client is disconnected + let handle = tokio::spawn(async move { + let chain_export_guard = ChainExportGuard::try_start_export()?; + + let ForestChainExportDiffParams { + from, + to, + depth, + output_path, + } = params; - let chain_finality = ctx.chain_config().policy.chain_finality; - if depth < chain_finality { - return Err( - anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(), + let chain_finality = ctx.chain_config().policy.chain_finality; + anyhow::ensure!( + depth >= chain_finality, + "depth {depth} must be greater than or equal to chain_finality {chain_finality}" ); - } - - let head = ctx.chain_store().heaviest_tipset(); - let start_ts = ctx.chain_index().load_required_tipset_by_height( - from, - head, - ResolveNullTipset::TakeOlder, - )?; - crate::tool::subcommands::archive_cmd::do_export( - ctx.chain_index().db(), - start_ts, - output_path, - None, - depth, - Some(to), - Some(chain_finality), - true, - ) - .await?; + let head = ctx.chain_store().heaviest_tipset(); + let start_ts = ctx.chain_index().load_required_tipset_by_height( + from, + head, + ResolveNullTipset::TakeOlder, + )?; + let tmp_path = + tempfile::TempPath::try_from_path(tmp_exporting_forest_car_path(&output_path))?; + let chain_export = crate::tool::subcommands::archive_cmd::do_export( + ctx.chain_index().db(), + start_ts, + tmp_path.to_path_buf(), + None, + depth, + Some(to), + Some(chain_finality), + true, + ); - Ok(()) + tokio::select! { + result = chain_export => { + result?; + tmp_path.persist(&output_path)?; + anyhow::Ok(ApiExportResult::Done) + }, + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); + tracing::warn!("Diff snapshot export was cancelled"); + anyhow::Ok(ApiExportResult::Cancelled) + }, + } + }); + Ok(handle.await??) } } diff --git a/src/rpc/types/mod.rs b/src/rpc/types/mod.rs index a2e44ea2d1b..c0f3768b437 100644 --- a/src/rpc/types/mod.rs +++ b/src/rpc/types/mod.rs @@ -579,7 +579,7 @@ lotus_json_with_self!(ApiExportStatus); #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, PartialEq, Eq, Hash)] pub enum ApiExportResult { - Done(Option), + Done, Cancelled, } diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 4b5c78e2b77..5b05847aa09 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -591,7 +591,7 @@ where .context("diff epoch must be smaller than target epoch")?; let diff_ts: &Tipset = &diff_ts; let diff_limit = diff_depth.map(|depth| diff_ts.epoch() - depth).unwrap_or(0); - let store = Arc::new(store.shallow_clone()); + let store = store.shallow_clone(); let mut stream = stream_chain( store.shallow_clone(), diff_ts.clone().chain_owned(store.shallow_clone()), From bc627d65e011a52c7799b46f57cde296ce071260 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 29 Jun 2026 23:52:13 +0800 Subject: [PATCH 05/18] more export-diff tests --- scripts/tests/calibnet_export_check.sh | 4 +- scripts/tests/calibnet_export_diff_check.sh | 41 ++++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index 56ddec0cdbc..d81992d7572 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -63,14 +63,14 @@ $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" echo "Exporting zstd compressed snapshot in $format format" $FOREST_CLI_PATH snapshot export --format "$format" & EXPORT_CMD_PID=$! -# another export job should be disallowed +echo "Verifying another export job is disallowed" if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then : else echo "another export job should be disallowed" exit 1 fi -# another export-diff job should be disallowed +echo "Verifying another export-diff job is disallowed" if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then : else diff --git a/scripts/tests/calibnet_export_diff_check.sh b/scripts/tests/calibnet_export_diff_check.sh index 3d3658c21ab..cb45fa6ce3f 100755 --- a/scripts/tests/calibnet_export_diff_check.sh +++ b/scripts/tests/calibnet_export_diff_check.sh @@ -9,21 +9,58 @@ source "$(dirname "$0")/harness.sh" forest_init "$@" +retries=30 +sleep_interval=0.5 + db_path=$($FOREST_TOOL_PATH db stats --chain calibnet | grep "Database path:" | cut -d':' -f2- | xargs) snapshot=$(find "$db_path/car_db"/*.car.zst | tail -n 1) snapshot_epoch=$(forest_query_epoch "$snapshot") +echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" +$FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & + +echo "Testing that export is in progress" +for ((i=1; i<=retries; i++)); do + output=$($FOREST_CLI_PATH snapshot export-status --format json) + is_exporting=$(echo "$output" | jq -r '.exporting') + if [ "$is_exporting" == "true" ]; then + break + fi + if [ $i -eq $retries ]; then + echo "export should be in progress" + exit 1 + fi + sleep $sleep_interval +done + +$FOREST_CLI_PATH snapshot export-cancel + +echo "Testing that export has been cancelled" +for ((i=1; i<=retries; i++)); do + output=$($FOREST_CLI_PATH snapshot export-status --format json) + is_exporting=$(echo "$output" | jq -r '.exporting') + is_cancelled=$(echo "$output" | jq -r '.cancelled') + if [ "$is_exporting" == "false" ] && [ "$is_cancelled" == "true" ]; then + break + fi + if [ $i -eq $retries ]; then + echo "export should be cancelled" + exit 1 + fi + sleep $sleep_interval +done + echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" $FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & EXPORT_CMD_PID=$! -# another export job should be disallowed +echo "Verifying another export job is disallowed" if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then : else echo "another export job should be disallowed" exit 1 fi -# another export-diff job should be disallowed +echo "Verifying another export-diff job is disallowed" if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then : else From 6b9d6f1cc1642e2cdb6a936fce639c7c3a1f2fdc Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 00:42:32 +0800 Subject: [PATCH 06/18] sleep 5 --- scripts/tests/calibnet_export_check.sh | 1 + scripts/tests/calibnet_export_diff_check.sh | 1 + 2 files changed, 2 insertions(+) diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index d81992d7572..145a6696870 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -63,6 +63,7 @@ $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" echo "Exporting zstd compressed snapshot in $format format" $FOREST_CLI_PATH snapshot export --format "$format" & EXPORT_CMD_PID=$! +sleep 5 echo "Verifying another export job is disallowed" if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then : diff --git a/scripts/tests/calibnet_export_diff_check.sh b/scripts/tests/calibnet_export_diff_check.sh index cb45fa6ce3f..355e2fd3c15 100755 --- a/scripts/tests/calibnet_export_diff_check.sh +++ b/scripts/tests/calibnet_export_diff_check.sh @@ -53,6 +53,7 @@ done echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" $FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & EXPORT_CMD_PID=$! +sleep 5 echo "Verifying another export job is disallowed" if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then : From 941f6c349cfcc6ae09325963a02e39e26eec2973 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 10:16:29 +0800 Subject: [PATCH 07/18] mise insta --- .../forest__rpc__tests__rpc__v0.snap | 19 +++++-------------- .../forest__rpc__tests__rpc__v1.snap | 19 +++++-------------- .../forest__rpc__tests__rpc__v2.snap | 19 +++++-------------- 3 files changed, 15 insertions(+), 42 deletions(-) diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap index 62602db8e7c..3f01e616739 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap @@ -415,7 +415,7 @@ methods: name: Forest.ChainExportDiff.Result required: true schema: - type: "null" + $ref: "#/components/schemas/ApiExportResult" paramStructure: by-position - name: Forest.ChainExportStatus description: Returns the progress and status of the in-progress chain export. @@ -4841,19 +4841,10 @@ components: - r - s ApiExportResult: - oneOf: - - type: string - enum: - - Cancelled - - type: object - properties: - Done: - type: - - string - - "null" - additionalProperties: false - required: - - Done + type: string + enum: + - Done + - Cancelled ApiExportStatus: type: object properties: diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap index 569cf957063..b7357571ef1 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap @@ -410,7 +410,7 @@ methods: name: Forest.ChainExportDiff.Result required: true schema: - type: "null" + $ref: "#/components/schemas/ApiExportResult" paramStructure: by-position - name: Forest.ChainExportStatus description: Returns the progress and status of the in-progress chain export. @@ -4910,19 +4910,10 @@ components: - r - s ApiExportResult: - oneOf: - - type: string - enum: - - Cancelled - - type: object - properties: - Done: - type: - - string - - "null" - additionalProperties: false - required: - - Done + type: string + enum: + - Done + - Cancelled ApiExportStatus: type: object properties: diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap index c50b6d763fc..a5d4a0d79f7 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap @@ -53,7 +53,7 @@ methods: name: Forest.ChainExportDiff.Result required: true schema: - type: "null" + $ref: "#/components/schemas/ApiExportResult" paramStructure: by-position - name: Forest.ChainExportStatus description: Returns the progress and status of the in-progress chain export. @@ -1725,19 +1725,10 @@ components: - r - s ApiExportResult: - oneOf: - - type: string - enum: - - Cancelled - - type: object - properties: - Done: - type: - - string - - "null" - additionalProperties: false - required: - - Done + type: string + enum: + - Done + - Cancelled ApiExportStatus: type: object properties: From 14449bda8b50bd0adf793eb6b6dee3ab469e41aa Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 10:33:41 +0800 Subject: [PATCH 08/18] more logging in tests --- scripts/tests/calibnet_export_check.sh | 12 +++++++----- scripts/tests/calibnet_export_diff_check.sh | 10 +++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index 145a6696870..323b9ac6832 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -63,24 +63,26 @@ $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" echo "Exporting zstd compressed snapshot in $format format" $FOREST_CLI_PATH snapshot export --format "$format" & EXPORT_CMD_PID=$! -sleep 5 -echo "Verifying another export job is disallowed" +sleep 10 +# another export job should be disallowed if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then - : + echo "verified another export job is disallowed" else echo "another export job should be disallowed" exit 1 fi -echo "Verifying another export-diff job is disallowed" +# another export-diff job should be disallowed if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then - : + echo "verified another export-diff job is disallowed" else echo "another export-diff job should be disallowed" exit 1 fi # Killing the CLI should not cancel the export +echo "killing cli command" kill -KILL $EXPORT_CMD_PID # Wait on the same export job +echo "waiting on export-status" $FOREST_CLI_PATH snapshot export-status --wait $FOREST_CLI_PATH shutdown --force diff --git a/scripts/tests/calibnet_export_diff_check.sh b/scripts/tests/calibnet_export_diff_check.sh index 355e2fd3c15..c0b1e1e3230 100755 --- a/scripts/tests/calibnet_export_diff_check.sh +++ b/scripts/tests/calibnet_export_diff_check.sh @@ -53,17 +53,17 @@ done echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" $FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & EXPORT_CMD_PID=$! -sleep 5 -echo "Verifying another export job is disallowed" +sleep 10 +# another export job should be disallowed if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then - : + echo "verified another export job is disallowed" else echo "another export job should be disallowed" exit 1 fi -echo "Verifying another export-diff job is disallowed" +# another export-diff job should be disallowed if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then - : + echo "verified another export-diff job is disallowed" else echo "another export-diff job should be disallowed" exit 1 From aae3f198855b7c3065ee3ecaeeded1b5953784ef Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 10:45:58 +0800 Subject: [PATCH 09/18] fix test --- scripts/tests/calibnet_export_check.sh | 10 +++++++--- scripts/tests/calibnet_export_diff_check.sh | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index 323b9ac6832..cb8cbe1c725 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -63,19 +63,23 @@ $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" echo "Exporting zstd compressed snapshot in $format format" $FOREST_CLI_PATH snapshot export --format "$format" & EXPORT_CMD_PID=$! -sleep 10 +sleep 5 # another export job should be disallowed -if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then +export_error=$($FOREST_CLI_PATH snapshot export 2>&1 || true) +if echo "$export_error" | grep -q "active chain export job has started"; then echo "verified another export job is disallowed" else echo "another export job should be disallowed" + echo "output was: $export_error" exit 1 fi # another export-diff job should be disallowed -if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then +export_diff_error=$($FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 || true) +if echo "$export_diff_error" | grep -q "active chain export job has started"; then echo "verified another export-diff job is disallowed" else echo "another export-diff job should be disallowed" + echo "output was: $export_diff_error" exit 1 fi # Killing the CLI should not cancel the export diff --git a/scripts/tests/calibnet_export_diff_check.sh b/scripts/tests/calibnet_export_diff_check.sh index c0b1e1e3230..b1f1b41f934 100755 --- a/scripts/tests/calibnet_export_diff_check.sh +++ b/scripts/tests/calibnet_export_diff_check.sh @@ -53,19 +53,23 @@ done echo "Exporting diff snapshot @ $snapshot_epoch with forest-cli snapshot export-diff" $FOREST_CLI_PATH snapshot export-diff --from "$snapshot_epoch" --to "$((snapshot_epoch - 900))" -d 900 -o diff1 & EXPORT_CMD_PID=$! -sleep 10 +sleep 5 # another export job should be disallowed -if $FOREST_CLI_PATH snapshot export 2>&1 | grep "active chain export job has started"; then +export_error=$($FOREST_CLI_PATH snapshot export 2>&1 || true) +if echo "$export_error" | grep -q "active chain export job has started"; then echo "verified another export job is disallowed" else echo "another export job should be disallowed" + echo "output was: $export_error" exit 1 fi # another export-diff job should be disallowed -if $FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 | grep "active chain export job has started"; then +export_diff_error=$($FOREST_CLI_PATH snapshot export-diff --from 11000 --to 10100 -d 900 2>&1 || true) +if echo "$export_diff_error" | grep -q "active chain export job has started"; then echo "verified another export-diff job is disallowed" else echo "another export-diff job should be disallowed" + echo "output was: $export_diff_error" exit 1 fi # Killing the CLI should not cancel the export From c61d596fb900869b15ab67c54178bea5abbf8450 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 11:18:48 +0800 Subject: [PATCH 10/18] fix --- src/rpc/methods/chain.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index acfa4087b54..1cbbcc0c6d7 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -338,7 +338,14 @@ impl RpcMethod<1> for ForestChainExport { tokio::select! { result = chain_export => { - result.map(|_| ApiExportResult::Done) + if let Some(checksum) = result? { + let path = forest_car_sha256sum_path(&output_path); + std::fs::write(path, format!("{} {}\n", + checksum.encode_hex::(), + output_path.file_name().and_then(std::ffi::OsStr::to_str) + .context("Failed to retrieve file name while saving checksum")?))?; + } + tmp_path.persist(&output_path)?; }, _ = chain_export_guard.cancellation_token().cancelled() => { chain_export_guard.cancel_export(); From bc7eb690999d758a4564edaafac4b590222eac86 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 11:34:58 +0800 Subject: [PATCH 11/18] refactor --- src/lib.rs | 1 + src/rpc/methods/chain.rs | 108 ++++++++++++++++++--------------------- 2 files changed, 52 insertions(+), 57 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5bfb5eb06a3..ee0929f09c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod prelude { pub use ahash::{HashMapExt as _, HashSetExt as _}; pub use anyhow::Context as _; pub use cid::Cid; + pub use futures::FutureExt as _; pub use itertools::Itertools as _; pub use std::{ops::Deref as _, sync::Arc}; } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 1cbbcc0c6d7..3c78a793d02 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -43,7 +43,11 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::convert::Infallible; use std::fs::File; -use std::{collections::VecDeque, path::PathBuf, sync::LazyLock}; +use std::{ + collections::VecDeque, + path::{Path, PathBuf}, + sync::LazyLock, +}; use tokio::sync::broadcast::{self, Receiver as Subscriber}; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -286,6 +290,25 @@ impl RpcMethod<1> for ForestChainExport { (params,): Self::Params, _: &http::Extensions, ) -> Result { + fn save_checksum( + checksum: digest::Output, + snapshot_output_path: &Path, + ) -> anyhow::Result<()> { + let path = forest_car_sha256sum_path(snapshot_output_path); + std::fs::write( + path, + format!( + "{} {}\n", + checksum.encode_hex::(), + snapshot_output_path + .file_name() + .and_then(std::ffi::OsStr::to_str) + .context("Failed to retrieve file name while saving checksum")? + ), + )?; + Ok(()) + } + // Spawn a task so it's not cancelled when CLI client is disconnected let handle = tokio::spawn(async move { let ForestChainExportParams { @@ -324,39 +347,16 @@ impl RpcMethod<1> for ForestChainExport { } else { tokio_util::either::Either::Right(tokio::fs::File::create(&tmp_path).await?) }; - let result = match version { - FilecoinSnapshotVersion::V1 => { - let db = ctx.db_owned(); - - let chain_export = crate::chain::export::( - &db, - &start_ts, - recent_roots, - writer, - options, - ); - - tokio::select! { - result = chain_export => { - if let Some(checksum) = result? { - let path = forest_car_sha256sum_path(&output_path); - std::fs::write(path, format!("{} {}\n", - checksum.encode_hex::(), - output_path.file_name().and_then(std::ffi::OsStr::to_str) - .context("Failed to retrieve file name while saving checksum")?))?; - } - tmp_path.persist(&output_path)?; - }, - _ = chain_export_guard.cancellation_token().cancelled() => { - chain_export_guard.cancel_export(); - tracing::warn!("Snapshot export was cancelled"); - Ok(ApiExportResult::Cancelled) - }, - } - } + let chain_export = match version { + FilecoinSnapshotVersion::V1 => crate::chain::export::( + ctx.db(), + &start_ts, + recent_roots, + writer, + options, + ) + .boxed(), FilecoinSnapshotVersion::V2 => { - let db = ctx.db_owned(); - let f3_snap_tmp_path = { let mut f3_snap_dir = output_path.clone(); let mut builder = tempfile::Builder::new(); @@ -379,37 +379,31 @@ impl RpcMethod<1> for ForestChainExport { } } }; - - let chain_export = crate::chain::export_v2::( - &db, + crate::chain::export_v2::( + ctx.db(), f3_snap, &start_ts, recent_roots, writer, options, - ); - - tokio::select! { - result = chain_export => { - if let Some(checksum) = result? { - let path = forest_car_sha256sum_path(&output_path); - std::fs::write(path, format!("{} {}\n", - checksum.encode_hex::(), - output_path.file_name().and_then(std::ffi::OsStr::to_str) - .context("Failed to retrieve file name while saving checksum")?))?; - } - tmp_path.persist(&output_path)?; - Ok(ApiExportResult::Done) - }, - _ = chain_export_guard.cancellation_token().cancelled() => { - chain_export_guard.cancel_export(); - tracing::warn!("Snapshot export was cancelled"); - Ok(ApiExportResult::Cancelled) - }, - } + ) + .boxed() } }; - anyhow::Ok(result?) + tokio::select! { + result = chain_export => { + if let Some(checksum) = result? { + save_checksum(checksum, &output_path)?; + } + tmp_path.persist(&output_path)?; + anyhow::Ok(ApiExportResult::Done) + }, + _ = chain_export_guard.cancellation_token().cancelled() => { + chain_export_guard.cancel_export(); + tracing::warn!("Snapshot export was cancelled"); + anyhow::Ok(ApiExportResult::Cancelled) + }, + } }); Ok(handle.await??) } From 75533516cdc51eb5ebc132f9c06d03725ed77ed3 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 13:35:06 +0800 Subject: [PATCH 12/18] fix --- src/cli/subcommands/snapshot_cmd.rs | 5 +++-- src/rpc/methods/chain.rs | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/cli/subcommands/snapshot_cmd.rs b/src/cli/subcommands/snapshot_cmd.rs index 468a14673a9..79550012082 100644 --- a/src/cli/subcommands/snapshot_cmd.rs +++ b/src/cli/subcommands/snapshot_cmd.rs @@ -307,6 +307,8 @@ impl SnapshotCommands { ).with_message(format!("Exporting {} ...", output_path.display())); pb.enable_steady_tick(std::time::Duration::from_millis(80)); let cancellation_token = CancellationToken::new(); + // Make sure token is cancelled on error path + let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref(); let handle = tokio::spawn({ let cancellation_token = cancellation_token.clone(); let path = tmp_exporting_forest_car_path(&output_path); @@ -321,13 +323,12 @@ impl SnapshotCommands { } } }); - // Manually construct RpcRequest because snapshot export could // take a few hours on mainnet let export_result = client .call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX)) .await?; - + // cancel before `handle.await` to avoid deadlock cancellation_token.cancel(); pb.finish(); _ = handle.await; diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 3c78a793d02..3fca1c525dd 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -392,10 +392,13 @@ impl RpcMethod<1> for ForestChainExport { }; tokio::select! { result = chain_export => { - if let Some(checksum) = result? { - save_checksum(checksum, &output_path)?; + if !dry_run + { + if let Some(checksum) = result? { + save_checksum(checksum, &output_path)?; + } + tmp_path.persist(&output_path)?; } - tmp_path.persist(&output_path)?; anyhow::Ok(ApiExportResult::Done) }, _ = chain_export_guard.cancellation_token().cancelled() => { From 4bb38c2a58e7d61b82691f6daebff6c64ba62f7e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 13:43:45 +0800 Subject: [PATCH 13/18] fix --- src/ipld/util.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 736944ddf96..31c7238e6a4 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -86,6 +86,10 @@ impl ChainExportGuard { impl Drop for ChainExportGuard { fn drop(&mut self) { + // In case some tasks are waiting on this token + if !self.cancellation_token.is_cancelled() { + self.cancellation_token.cancel(); + } end_export() } } From 48429efcb12331468557f0706caef6be33b2fced Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 30 Jun 2026 13:52:07 +0800 Subject: [PATCH 14/18] fix --- src/rpc/methods/chain.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 3fca1c525dd..d76372faf0b 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -392,12 +392,13 @@ impl RpcMethod<1> for ForestChainExport { }; tokio::select! { result = chain_export => { + let checksum_opt = result?; if !dry_run { - if let Some(checksum) = result? { + tmp_path.persist(&output_path)?; + if let Some(checksum) = checksum_opt { save_checksum(checksum, &output_path)?; } - tmp_path.persist(&output_path)?; } anyhow::Ok(ApiExportResult::Done) }, From 9534664736bd829a94ee92fff27303d8c6d702d4 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 1 Jul 2026 17:07:12 +0800 Subject: [PATCH 15/18] resolve comments --- src/db/car/forest.rs | 30 ++++++++++++++++++++++++++++++ src/ipld/util.rs | 4 +--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 7d095db6e2b..1e14db9c147 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -525,6 +525,7 @@ mod tests { use super::*; use nunny::vec as nonempty; use quickcheck_macros::quickcheck; + use rstest::rstest; use tokio_test::block_on; fn mk_encoded_car( @@ -642,4 +643,33 @@ mod tests { assert_eq!(forest_car.get(&cid_a).unwrap().unwrap(), blocks[0].data); assert_eq!(forest_car.get(&cid_b).unwrap().unwrap(), blocks[1].data); } + + #[rstest] + #[case( + Path::new("/tmp/a.forst.car.zst"), + Path::new("/tmp/a.forst.car.zst.tmp") + )] + #[case(Path::new("tmp/a.forst.car.zst"), Path::new("tmp/a.forst.car.zst.tmp"))] + #[case(Path::new("a.forst.car.zst"), Path::new("a.forst.car.zst.tmp"))] + #[case(Path::new(""), Path::new(""))] + #[case(Path::new("."), Path::new("."))] + fn test_tmp_exporting_forest_car_path(#[case] input: &Path, #[case] output: &Path) { + assert_eq!(tmp_exporting_forest_car_path(input), output); + } + + #[rstest] + #[case( + Path::new("/tmp/a.forst.car.zst"), + Path::new("/tmp/a.forst.car.zst.sha256sum") + )] + #[case( + Path::new("tmp/a.forst.car.zst"), + Path::new("tmp/a.forst.car.zst.sha256sum") + )] + #[case(Path::new("a.forst.car.zst"), Path::new("a.forst.car.zst.sha256sum"))] + #[case(Path::new(""), Path::new(""))] + #[case(Path::new("."), Path::new("."))] + fn test_forest_car_sha256sum_path(#[case] input: &Path, #[case] output: &Path) { + assert_eq!(forest_car_sha256sum_path(input), output); + } } diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 31c7238e6a4..68b4592525a 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -87,9 +87,7 @@ impl ChainExportGuard { impl Drop for ChainExportGuard { fn drop(&mut self) { // In case some tasks are waiting on this token - if !self.cancellation_token.is_cancelled() { - self.cancellation_token.cancel(); - } + self.cancellation_token.cancel(); end_export() } } From 3adebb166ee767428ad692c4d75e0782b1465dcd Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 1 Jul 2026 19:47:13 +0800 Subject: [PATCH 16/18] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24f22592745..bddee4a91d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ ### Breaking +- [#7252](https://github.com/ChainSafe/forest/pull/7252): `forest-cli snapshot export` now generates checksum file at {filepath}.sha256sum. The default file extension is changed from `.forest.car.sha256sum` to `.forest.car.zst.sha256sum`. + ### Added ### Changed From da492d1fc887638b08899177eabb38e4bd4382eb Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 1 Jul 2026 21:51:17 +0800 Subject: [PATCH 17/18] AbortOnDropHandle --- Cargo.lock | 1 + Cargo.toml | 2 +- src/rpc/methods/chain.rs | 9 +++++---- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af7bf47bb13..e9ca518a150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9921,6 +9921,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index cc26f448904..ddae566ea4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -225,7 +225,7 @@ thiserror = "2" tokio = { version = "1", features = ['full'] } tokio-stream = { version = "0.1", features = ["fs", "io-util"] } tokio-tungstenite = "0.29" -tokio-util = { version = "0.7", features = ["compat", "io-util"] } +tokio-util = { version = "0.7", features = ["compat", "io-util", "rt"] } toml = "1" tower = { version = "0.5", features = ["util"] } tower-http = { version = "0.7", features = ["compression-full", "sensitive-headers"] } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index d76372faf0b..e05a41e7269 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -49,6 +49,7 @@ use std::{ sync::LazyLock, }; use tokio::sync::broadcast::{self, Receiver as Subscriber}; +use tokio_util::task::AbortOnDropHandle; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -310,7 +311,7 @@ impl RpcMethod<1> for ForestChainExport { } // Spawn a task so it's not cancelled when CLI client is disconnected - let handle = tokio::spawn(async move { + let handle = AbortOnDropHandle::new(tokio::spawn(async move { let ForestChainExportParams { version, epoch, @@ -408,7 +409,7 @@ impl RpcMethod<1> for ForestChainExport { anyhow::Ok(ApiExportResult::Cancelled) }, } - }); + })); Ok(handle.await??) } } @@ -505,7 +506,7 @@ impl RpcMethod<1> for ForestChainExportDiff { _: &http::Extensions, ) -> Result { // Spawn a task so it's not cancelled when CLI client is disconnected - let handle = tokio::spawn(async move { + let handle = AbortOnDropHandle::new(tokio::spawn(async move { let chain_export_guard = ChainExportGuard::try_start_export()?; let ForestChainExportDiffParams { @@ -552,7 +553,7 @@ impl RpcMethod<1> for ForestChainExportDiff { anyhow::Ok(ApiExportResult::Cancelled) }, } - }); + })); Ok(handle.await??) } } From 855283d837775d2ac97f6a137b92ab3bd4ae92e8 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 1 Jul 2026 22:25:21 +0800 Subject: [PATCH 18/18] fix --- src/rpc/methods/chain.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index e05a41e7269..5514f2456e8 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -49,7 +49,6 @@ use std::{ sync::LazyLock, }; use tokio::sync::broadcast::{self, Receiver as Subscriber}; -use tokio_util::task::AbortOnDropHandle; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -310,8 +309,9 @@ impl RpcMethod<1> for ForestChainExport { Ok(()) } - // Spawn a task so it's not cancelled when CLI client is disconnected - let handle = AbortOnDropHandle::new(tokio::spawn(async move { + // Spawn a task so it's not cancelled when CLI client is disconnected. + // So do not wrap this with `AbortOnDropHandle` + let handle = tokio::spawn(async move { let ForestChainExportParams { version, epoch, @@ -409,7 +409,7 @@ impl RpcMethod<1> for ForestChainExport { anyhow::Ok(ApiExportResult::Cancelled) }, } - })); + }); Ok(handle.await??) } } @@ -506,7 +506,8 @@ impl RpcMethod<1> for ForestChainExportDiff { _: &http::Extensions, ) -> Result { // Spawn a task so it's not cancelled when CLI client is disconnected - let handle = AbortOnDropHandle::new(tokio::spawn(async move { + // So do not wrap this with `AbortOnDropHandle` + let handle = tokio::spawn(async move { let chain_export_guard = ChainExportGuard::try_start_export()?; let ForestChainExportDiffParams { @@ -553,7 +554,7 @@ impl RpcMethod<1> for ForestChainExportDiff { anyhow::Ok(ApiExportResult::Cancelled) }, } - })); + }); Ok(handle.await??) } }