diff --git a/src/engine/live.rs b/src/engine/live.rs index 7fe587fc..913c97d6 100644 --- a/src/engine/live.rs +++ b/src/engine/live.rs @@ -31,6 +31,7 @@ use crate::{ connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError, SyncFinished, }, + store::Query, AuthorHeads, ContentStatus, NamespaceId, SignedEntry, }; @@ -582,6 +583,9 @@ impl LiveActor { } } } + + // Queue downloads for entries with incomplete blobs. + self.check_incomplete_blobs(namespace, peer).await; } }; @@ -741,6 +745,52 @@ impl LiveActor { Ok(()) } + /// Scan a namespace for entries whose blobs are not yet complete and queue downloads. + /// + /// This is called after a successful sync so that blobs that were missed on earlier + /// syncs (e.g. because the first peer lacked the content, or due to a restart losing + /// the in-memory `missing_hashes` set) get another download attempt with `peer` as a + /// candidate provider. + async fn check_incomplete_blobs(&mut self, namespace: NamespaceId, peer: PublicKey) { + let policy = match self.sync.get_download_policy(namespace).await { + Ok(policy) => policy, + Err(e) => { + warn!(%e, "failed to get download policy for incomplete blob check"); + return; + } + }; + + let (tx, mut rx) = irpc::channel::mpsc::channel::>(64); + if let Err(e) = self + .sync + .get_many(namespace, Query::all().build(), tx) + .await + { + warn!(%e, "failed to get entries for incomplete blob check"); + return; + } + + let mut queued = 0u64; + while let Ok(Some(entry_result)) = rx.recv().await { + let entry = match entry_result { + Ok(entry) => entry, + Err(_) => continue, + }; + if !policy.matches(entry.entry()) { + continue; + } + if entry.content_len() == 0 { + continue; + } + let hash = entry.content_hash(); + self.start_download(namespace, hash, peer, false).await; + queued += 1; + } + if queued > 0 { + debug!(namespace=%namespace.fmt_short(), %queued, "queued incomplete blob downloads after sync"); + } + } + async fn start_download( &mut self, namespace: NamespaceId, diff --git a/tests/sync.rs b/tests/sync.rs index 6a096c78..22213f38 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -1225,6 +1225,432 @@ async fn sync_drop_doc() -> Result<()> { Ok(()) } +/// Peer B first syncs with peer C (who has metadata but no blob), then syncs with peer A +/// (who has the blob). Without the incomplete-blob retry after sync, B would never download. +#[tokio::test] +#[traced_test] +async fn sync_retries_blob_from_second_peer() -> Result<()> { + let mut rng = test_rng(b"sync_retries_blob_from_second_peer"); + let nodes = spawn_nodes(3, &mut rng).await?; + let clients = nodes.iter().map(|node| node.client()).collect::>(); + + // A creates a doc and writes a key + let author_a = clients[0].docs().author_create().await?; + let doc_a = clients[0].docs().create().await?; + let hash = doc_a + .set_bytes(author_a, b"key1".to_vec(), b"value1".to_vec()) + .await?; + + let mut ticket = doc_a + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + let addr_a = ticket.nodes[0].clone(); + // unset peers to not yet start sync + ticket.nodes = vec![]; + + // C joins with NothingExcept policy, syncs with A (metadata only, no blob) + info!("C: import and sync with A"); + let doc_c = clients[2].docs().import(ticket.clone()).await?; + doc_c + .set_download_policy(DownloadPolicy::NothingExcept(vec![])) + .await?; + let mut events_c = doc_c.subscribe().await?; + doc_c.start_sync(vec![addr_a.clone()]).await?; + assert_next_unordered_with_optionals( + &mut events_c, + TIMEOUT, + vec![ + match_event!(LiveEvent::InsertRemote { .. }), + match_event!(LiveEvent::SyncFinished(_)), + ], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + // B joins and syncs with C only — gets entry metadata but C has no blob to offer + info!("B: import and sync with C"); + let ticket_c = doc_c + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + let addr_c = ticket_c.nodes[0].clone(); + + let doc_b = clients[1].docs().import(ticket.clone()).await?; + let blobs_b = clients[1].blobs(); + let mut events_b = doc_b.subscribe().await?; + + doc_b.start_sync(vec![addr_c]).await?; + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![ + match_event!(LiveEvent::InsertRemote { .. }), + match_event!(LiveEvent::SyncFinished(_)), + ], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + // B has the entry but not the blob + assert!(blobs_b.get_bytes(hash).await.is_err()); + + // B syncs with A — the post-sync incomplete blob check should discover and download it + info!("B: sync with A"); + doc_b.start_sync(vec![addr_a]).await?; + + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![Box::new( + move |e| matches!(e, LiveEvent::ContentReady { hash: h } if *h == hash), + )], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert_latest(blobs_b, &doc_b, b"key1", b"value1").await; + + for node in nodes { + node.shutdown().await?; + } + Ok(()) +} + +/// After changing the download policy to include previously excluded keys, a re-sync should +/// trigger downloads for those entries. +#[tokio::test] +#[traced_test] +async fn sync_retries_blob_after_policy_change() -> Result<()> { + let mut rng = test_rng(b"sync_retries_blob_after_policy_change"); + let nodes = spawn_nodes(2, &mut rng).await?; + let clients = nodes.iter().map(|node| node.client()).collect::>(); + + // A writes two keys + let author_a = clients[0].docs().author_create().await?; + let doc_a = clients[0].docs().create().await?; + let hash_inc = doc_a + .set_bytes(author_a, b"included".to_vec(), b"yes".to_vec()) + .await?; + let hash_exc = doc_a + .set_bytes(author_a, b"excluded".to_vec(), b"no".to_vec()) + .await?; + + let mut ticket = doc_a + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + let addr_a = ticket.nodes[0].clone(); + // unset peers to not yet start sync + ticket.nodes = vec![]; + + // B joins with NothingExcept("included"), syncs with A + info!("B: import and sync with policy NothingExcept(included)"); + let doc_b = clients[1].docs().import(ticket).await?; + let blobs_b = clients[1].blobs(); + doc_b + .set_download_policy(DownloadPolicy::NothingExcept(vec![FilterKind::Exact( + "included".into(), + )])) + .await?; + let mut events_b = doc_b.subscribe().await?; + + doc_b.start_sync(vec![addr_a.clone()]).await?; + + // B gets both entries but only downloads "included" + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![ + match_event!(LiveEvent::InsertRemote { .. }), + match_event!(LiveEvent::InsertRemote { .. }), + Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash_inc)), + ], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert_latest(blobs_b, &doc_b, b"included", b"yes").await; + assert!(blobs_b.get_bytes(hash_exc).await.is_err()); + + // B changes policy to download everything and re-syncs + info!("B: change policy to EverythingExcept and re-sync"); + doc_b + .set_download_policy(DownloadPolicy::EverythingExcept(vec![])) + .await?; + doc_b.start_sync(vec![addr_a]).await?; + + // the post-sync incomplete blob check should now download the previously excluded blob + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![Box::new( + move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash_exc), + )], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert_latest(blobs_b, &doc_b, b"excluded", b"no").await; + + for node in nodes { + node.shutdown().await?; + } + Ok(()) +} + +/// When B downloads a previously missing blob after syncing with A, C (also missing the blob) +/// can fetch it from B by syncing with B. +#[tokio::test] +#[traced_test] +async fn sync_retries_missing_blob_propagates_content_ready() -> Result<()> { + let mut rng = test_rng(b"sync_retries_propagation"); + let nodes = spawn_nodes(3, &mut rng).await?; + let clients = nodes.iter().map(|node| node.client()).collect::>(); + + // A creates a doc and writes a key + let author_a = clients[0].docs().author_create().await?; + let doc_a = clients[0].docs().create().await?; + let hash = doc_a + .set_bytes(author_a, b"data".to_vec(), b"hello".to_vec()) + .await?; + + let mut ticket = doc_a + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + let addr_a = ticket.nodes[0].clone(); + // unset peers to not yet start sync + ticket.nodes = vec![]; + + // C joins with NothingExcept policy, syncs with A (metadata only, no blob) + info!("C: import and sync with A"); + let doc_c = clients[2].docs().import(ticket.clone()).await?; + doc_c + .set_download_policy(DownloadPolicy::NothingExcept(vec![])) + .await?; + let mut events_c = doc_c.subscribe().await?; + doc_c.start_sync(vec![addr_a.clone()]).await?; + + assert_next_unordered_with_optionals( + &mut events_c, + TIMEOUT, + vec![ + match_event!(LiveEvent::InsertRemote { .. }), + match_event!(LiveEvent::SyncFinished(_)), + ], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + // B joins and syncs with A (who has the blob) + info!("B: import and sync with A"); + let doc_b = clients[1].docs().import(ticket.clone()).await?; + let blobs_b = clients[1].blobs(); + let mut events_b = doc_b.subscribe().await?; + doc_b.start_sync(vec![addr_a]).await?; + + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![ + match_event!(LiveEvent::InsertRemote { .. }), + Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash: h } if *h == hash)), + match_event!(LiveEvent::SyncFinished(_)), + ], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert_latest(blobs_b, &doc_b, b"data", b"hello").await; + + // C changes policy to download everything and syncs with B (who now has the blob) + info!("C: change policy and sync with B"); + doc_c + .set_download_policy(DownloadPolicy::EverythingExcept(vec![])) + .await?; + let blobs_c = clients[2].blobs(); + doc_c + .start_sync(vec![iroh::EndpointAddr::new(nodes[1].id())]) + .await?; + + assert_next_unordered_with_optionals( + &mut events_c, + TIMEOUT, + vec![Box::new( + move |e| matches!(e, LiveEvent::ContentReady { hash: h } if *h == hash), + )], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert_latest(blobs_c, &doc_c, b"data", b"hello").await; + + for node in nodes { + node.shutdown().await?; + } + Ok(()) +} + +/// After a restart, blobs whose metadata was persisted but whose content was never +/// downloaded should be fetched when the node syncs again. +#[tokio::test] +#[traced_test] +#[cfg(feature = "fs-store")] +async fn sync_retries_blob_after_restart() -> Result<()> { + use crate::util::endpoint; + + let _rng = test_rng(b"sync_retries_blob_after_restart"); + let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?; + let lookup_server = iroh::test_utils::DnsPkarrServer::run().await?; + + // create node A with a doc and a blob + let secret_key_a = SecretKey::generate(); + let ep_a = endpoint(secret_key_a, relay_map.clone(), Some(&lookup_server)).await?; + let node_a = Node::memory(ep_a).spawn().await?; + let author_a = node_a.docs().author_create().await?; + let doc_a = node_a.docs().create().await?; + let hash = doc_a + .set_bytes(author_a, b"restart_key".to_vec(), b"restart_value".to_vec()) + .await?; + let ticket = doc_a + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + + // create node B with persistent storage, join with NothingExcept policy + let node_b_dir = tempfile::TempDir::with_prefix("test-sync_retries_blob_after_restart-node_b")?; + let secret_key_b = SecretKey::generate(); + let ep_b = endpoint( + secret_key_b.clone(), + relay_map.clone(), + Some(&lookup_server), + ) + .await?; + let node_b = Node::persistent(&node_b_dir, ep_b).spawn().await?; + let id_b = node_b.id(); + + let doc_b = node_b.docs().import(ticket.clone()).await?; + doc_b + .set_download_policy(DownloadPolicy::NothingExcept(vec![])) + .await?; + + let mut events_b = doc_b.subscribe().await?; + + // wait for B's initial sync with A — metadata only, no blob + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![ + match_event!(LiveEvent::InsertRemote { .. }), + match_event!(LiveEvent::SyncFinished(_)), + ], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert!(node_b.blobs().get_bytes(hash).await.is_err()); + + // change policy to download everything, then restart B + doc_b + .set_download_policy(DownloadPolicy::EverythingExcept(vec![])) + .await?; + let ns_b = doc_b.id(); + + info!(me = %id_b.fmt_short(), "node_b shutdown"); + node_b.shutdown().await?; + + info!(me = %id_b.fmt_short(), "node_b respawn"); + let ep_b = endpoint( + secret_key_b.clone(), + relay_map.clone(), + Some(&lookup_server), + ) + .await?; + let node_b = Node::persistent(&node_b_dir, ep_b).spawn().await?; + assert_eq!(id_b, node_b.id()); + + let doc_b = node_b.docs().open(ns_b).await?.expect("doc to exist"); + let blobs_b = node_b.blobs(); + let mut events_b = doc_b.subscribe().await?; + + // trigger sync — the post-sync incomplete blob check should download the blob + info!(me = %id_b.fmt_short(), "node_b start_sync"); + doc_b.start_sync(vec![]).await?; + + assert_next_unordered_with_optionals( + &mut events_b, + TIMEOUT, + vec![Box::new( + move |e| matches!(e, LiveEvent::ContentReady { hash: h } if *h == hash), + )], + vec![ + match_event!(LiveEvent::NeighborUp(_)), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::SyncFinished(_)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + assert_latest(blobs_b, &doc_b, b"restart_key", b"restart_value").await; + + node_a.shutdown().await?; + node_b.shutdown().await?; + Ok(()) +} + async fn assert_latest(blobs: &iroh_blobs::api::Store, doc: &Doc, key: &[u8], value: &[u8]) { let content = get_latest(blobs, doc, key).await.unwrap(); assert_eq!(content, value.to_vec());