Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
SyncFinished,
},
store::Query,
AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
};

Expand Down Expand Up @@ -582,6 +583,9 @@ impl LiveActor {
}
}
}

// Queue downloads for entries with incomplete blobs.
self.check_incomplete_blobs(namespace, peer).await;
}
};

Expand Down Expand Up @@ -741,6 +745,52 @@ impl LiveActor {
Ok(())
}

/// Scan a namespace for entries whose blobs are not yet complete and queue downloads.
///
/// This is called after a successful sync so that blobs that were missed on earlier
/// syncs (e.g. because the first peer lacked the content, or due to a restart losing
/// the in-memory `missing_hashes` set) get another download attempt with `peer` as a
/// candidate provider.
async fn check_incomplete_blobs(&mut self, namespace: NamespaceId, peer: PublicKey) {
let policy = match self.sync.get_download_policy(namespace).await {
Ok(policy) => policy,
Err(e) => {
warn!(%e, "failed to get download policy for incomplete blob check");
return;
}
};

let (tx, mut rx) = irpc::channel::mpsc::channel::<crate::api::RpcResult<SignedEntry>>(64);
if let Err(e) = self
.sync
.get_many(namespace, Query::all().build(), tx)
.await
{
warn!(%e, "failed to get entries for incomplete blob check");
return;
}

let mut queued = 0u64;
while let Ok(Some(entry_result)) = rx.recv().await {
let entry = match entry_result {
Ok(entry) => entry,
Err(_) => continue,
};
if !policy.matches(entry.entry()) {
continue;
}
if entry.content_len() == 0 {
continue;
}
let hash = entry.content_hash();
self.start_download(namespace, hash, peer, false).await;
queued += 1;
}
if queued > 0 {
debug!(namespace=%namespace.fmt_short(), %queued, "queued incomplete blob downloads after sync");
}
}

async fn start_download(
&mut self,
namespace: NamespaceId,
Expand Down
Loading
Loading