diff --git a/Cargo.lock b/Cargo.lock index 89872aad..34d0cbbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2087,6 +2087,7 @@ version = "0.99.0" dependencies = [ "anyhow", "async-channel", + "async-trait", "blake3", "bytes", "cfg_aliases", diff --git a/Cargo.toml b/Cargo.toml index 802b7f92..a9a67b0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ rust-version = "1.91" [dependencies] anyhow = "1" async-channel = "2.3.1" +async-trait = "0.1" blake3 = "1.8" bytes = { version = "1.7", features = ["serde"] } derive_more = { version = "2.0.1", features = [ @@ -93,7 +94,7 @@ missing_debug_implementations = "warn" # require a feature enabled when using `--cfg docsrs` which we can not # do. To enable for a crate set `#![cfg_attr(iroh_docsrs, # feature(doc_cfg))]` in the crate. -unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] } +unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(wasm_browser)"] } [build-dependencies] cfg_aliases = "0.2.1" diff --git a/README.md b/README.md index c02ac46d..66b0de6b 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,14 @@ async fn main() -> anyhow::Result<()> { } ``` +The snippet above is **minimal wiring** only (ALPN + router). For an end-to-end sample of the high-level [`Doc`](https://docs.rs/iroh-docs/latest/iroh_docs/api/struct.Doc.html) API—including [`subscribe_resolved`](https://docs.rs/iroh-docs/latest/iroh_docs/api/struct.Doc.html#method.subscribe_resolved) with a key prefix, blob bytes, and `initial_snapshot`—run: + +```sh +cargo run --example subscribe_resolved +``` + +See [`examples/subscribe_resolved.rs`](examples/subscribe_resolved.rs) for the full walkthrough. + # License Copyright 2026 N0, INC. diff --git a/examples/subscribe_resolved.rs b/examples/subscribe_resolved.rs new file mode 100644 index 00000000..f6b48314 --- /dev/null +++ b/examples/subscribe_resolved.rs @@ -0,0 +1,99 @@ +//! Subscribe to the **latest entry per key** under a **key prefix**, including **blob bytes**, on **one stream**. +//! +//! [`iroh_docs::api::Doc::subscribe_resolved`] uses a [`Query::single_latest_per_key`] scope (here +//! [`QueryBuilder::key_prefix`](iroh_docs::store::QueryBuilder::key_prefix)). Each stream item is +//! the current winning entry for some key in range, emitted only once its blob is locally +//! [`BlobStatus::Complete`](iroh_blobs::api::blobs::BlobStatus::Complete) (or the record is empty). +//! +//! With `initial_snapshot: true`, the resolver first emits the latest entry for every matching key +//! that already exists; then it continues with live updates. +//! +//! The `main` body uses the usual pattern for long-lived subscriptions: `while let Some(result) = +//! stream.next().await`, handle each `Ok`/`Err`, and exit when the stream yields `None` (sender +//! dropped or subscription closed). This demo **breaks** after three items so the process can exit: +//! (1) snapshot for an existing key, (2) live insert for a **new** key under the prefix, (3) live +//! update when **`app/foo` is written again**—same key, new latest entry. Omit that `break` in a +//! real app, or drive the loop with `tokio::select!` alongside shutdown. + +use anyhow::Result; +use iroh::{endpoint::presets, protocol::Router, Endpoint}; +use iroh_blobs::{store::mem::MemStore, BlobsProtocol, ALPN as BLOBS_ALPN}; +use iroh_docs::{ + protocol::Docs, store::Query, subscribe_resolved::ResolvedSubscribeOpts, ALPN as DOCS_ALPN, +}; +use iroh_gossip::{net::Gossip, ALPN as GOSSIP_ALPN}; +use n0_future::StreamExt; + +#[tokio::main] +async fn main() -> Result<()> { + let endpoint = Endpoint::bind(presets::Minimal).await?; + + let blobs = MemStore::default(); + let blob_store = (*blobs).clone(); + + let gossip = Gossip::builder().spawn(endpoint.clone()); + let docs = Docs::memory() + .spawn(endpoint.clone(), blob_store.clone(), gossip.clone()) + .await?; + + let router = Router::builder(endpoint.clone()) + .accept(BLOBS_ALPN, BlobsProtocol::new(&blobs, None)) + .accept(GOSSIP_ALPN, gossip) + .accept(DOCS_ALPN, docs.clone()) + .spawn(); + + let author = docs.author_create().await?; + let doc = docs.create().await?; + + doc.set_bytes(author, b"app/foo".to_vec(), b"hello".to_vec()) + .await?; + + let scope = Query::single_latest_per_key().key_prefix(b"app/").build(); + + let mut stream = doc + .subscribe_resolved( + &blob_store, + scope, + ResolvedSubscribeOpts { + initial_snapshot: true, + include_content: true, + resolution_delay: None, + }, + ) + .await?; + + let mut updates = 0u32; + + while let Some(result) = stream.next().await { + let kv = result?; + updates += 1; + println!( + "update #{updates}: key={:?} content={:?}", + String::from_utf8_lossy(&kv.key), + kv.content.as_deref().map(String::from_utf8_lossy) + ); + + match updates { + // Snapshot was `app/foo`; add another key under the same prefix. + 1 => { + doc.set_bytes(author, b"app/bar".to_vec(), b"world".to_vec()) + .await?; + } + // Show that the stream also fires when an existing key gets a new latest entry. + 2 => { + doc.set_bytes(author, b"app/foo".to_vec(), b"hello again".to_vec()) + .await?; + } + _ => {} + } + + if updates >= 3 { + break; + } + } + + drop(stream); + + router.shutdown().await?; + Ok(()) +} diff --git a/src/api.rs b/src/api.rs index fa505959..155fa55c 100644 --- a/src/api.rs +++ b/src/api.rs @@ -38,6 +38,10 @@ use crate::{ actor::OpenState, engine::{Engine, LiveEvent}, store::{DownloadPolicy, Query}, + subscribe_resolved::{ + subscribe_resolved_with, FetchAllBox, FetchLatestBox, ResolvedFetcher, ResolvedKeyValue, + ResolvedSubscribeOpts, + }, Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes, }; @@ -477,6 +481,40 @@ impl Doc { }))) } + /// Like [`Engine::subscribe_resolved`](crate::engine::Engine::subscribe_resolved): the latest + /// entry per key for `scope`, emitted only when blobs are complete locally. Requires the same + /// [`iroh_blobs::api::Store`] used by this node for reads. + pub async fn subscribe_resolved( + &self, + blobs: &iroh_blobs::api::Store, + scope: impl Into, + opts: ResolvedSubscribeOpts, + ) -> Result>> { + self.ensure_open()?; + let scope = scope.into(); + let doc_latest = self.clone(); + let latest: std::sync::Arc FetchLatestBox + Send + Sync> = + std::sync::Arc::new(move |q: Query| { + let doc = doc_latest.clone(); + Box::pin(async move { + let mut s = n0_future::StreamExt::boxed(doc.get_many(q).await?); + n0_future::TryStreamExt::try_next(&mut s).await + }) as FetchLatestBox + }); + let doc_all = self.clone(); + let all: std::sync::Arc FetchAllBox + Send + Sync> = + std::sync::Arc::new(move |q: Query| { + let doc = doc_all.clone(); + Box::pin(async move { + let mut s = n0_future::StreamExt::boxed(doc.get_many(q).await?); + n0_future::TryStreamExt::try_collect(&mut s).await + }) as FetchAllBox + }); + let fetcher = ResolvedFetcher::new(latest, all); + let live = self.subscribe().await?; + subscribe_resolved_with(live, fetcher, blobs.clone(), scope, opts) + } + /// Returns status info for this document pub async fn status(&self) -> Result { self.ensure_open()?; diff --git a/src/engine.rs b/src/engine.rs index 88013f16..edb31a0b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -236,6 +236,36 @@ impl Engine { Ok(a.or(b)) } + /// Subscribe to updates for the latest entry for each key matching `scope` (per + /// [`crate::store::Query::single_latest_per_key`]), emitting only when that entry’s blob is + /// complete locally (or the record is empty). + /// + /// `scope` must be built with [`crate::store::Query::single_latest_per_key`]. See + /// [`crate::subscribe_resolved`] for details. + pub async fn subscribe_resolved( + &self, + namespace: NamespaceId, + scope: crate::store::Query, + opts: crate::subscribe_resolved::ResolvedSubscribeOpts, + ) -> anyhow::Result< + tokio_stream::wrappers::ReceiverStream< + anyhow::Result, + >, + > { + let live = self.subscribe(namespace).await?; + let fetcher = crate::subscribe_resolved::ResolvedFetcher::for_sync_engine( + self.sync.clone(), + namespace, + ); + crate::subscribe_resolved::subscribe_resolved_with( + live, + fetcher, + self.blob_store().clone(), + scope, + opts, + ) + } + /// Handle an incoming iroh-docs connection. pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> { self.to_live_actor diff --git a/src/lib.rs b/src/lib.rs index 215c0a7b..81799c11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,7 @@ pub mod engine; pub mod actor; pub mod api; pub mod store; +pub mod subscribe_resolved; pub mod sync; mod heads; diff --git a/src/store.rs b/src/store.rs index e5946a08..8d273829 100644 --- a/src/store.rs +++ b/src/store.rs @@ -290,6 +290,30 @@ pub struct Query { } impl Query { + /// Returns `true` if this query aggregates with [`SingleLatestPerKeyQuery`]. + pub fn is_single_latest_per_key(&self) -> bool { + matches!(self.kind, QueryKind::SingleLatestPerKey(_)) + } + + /// Returns a copy of this query with an exact key filter and limit `1`, preserving author + /// filter, `include_empty`, and sort direction. + /// + /// Returns `None` if this query is not [`SingleLatestPerKeyQuery`]. + pub fn single_latest_for_exact_key(&self, key: impl AsRef<[u8]>) -> Option { + match &self.kind { + QueryKind::SingleLatestPerKey(k) => Some(Query { + kind: QueryKind::SingleLatestPerKey(k.clone()), + filter_author: self.filter_author.clone(), + filter_key: KeyFilter::Exact(Bytes::copy_from_slice(key.as_ref())), + limit: Some(1), + offset: 0, + include_empty: self.include_empty, + sort_direction: self.sort_direction, + }), + _ => None, + } + } + /// Query all records. pub fn all() -> QueryBuilder { Default::default() @@ -324,6 +348,21 @@ impl Query { pub fn offset(&self) -> u64 { self.offset } + + /// Key filter applied by this query. + pub fn filter_key(&self) -> &KeyFilter { + &self.filter_key + } + + /// Author filter applied by this query. + pub fn filter_author(&self) -> &AuthorFilter { + &self.filter_author + } + + /// Whether empty (tombstone) records are included. + pub fn include_empty(&self) -> bool { + self.include_empty + } } /// Sort direction diff --git a/src/subscribe_resolved.rs b/src/subscribe_resolved.rs new file mode 100644 index 00000000..5ec3e861 --- /dev/null +++ b/src/subscribe_resolved.rs @@ -0,0 +1,771 @@ +//! Subscription to the latest entry per key ([`Query::single_latest_per_key`]) with blob availability. +//! +//! Entry points: [`subscribe_resolved_with`], [`ResolvedFetcher`], +//! [`Engine::subscribe_resolved`](crate::engine::Engine::subscribe_resolved), and +//! [`Doc::subscribe_resolved`](crate::api::Doc::subscribe_resolved). +//! +//! Live [`crate::engine::LiveEvent`] values are forwarded on a dedicated task so slow resolution +//! work does not fill the bounded replica subscribe channel (see [`Engine::subscribe`](crate::engine::Engine::subscribe)). +//! +//! ## WebAssembly +//! +//! This module is intended to work on `wasm32-unknown-unknown` under the same **Tokio-based** +//! runtime as the rest of iroh (for example via `wasm-bindgen-futures` driving the executor). +//! Internal work is scheduled with [`n0_future::task::spawn`]; timers use [`n0_future::time`], not +//! Tokio-only clocks. The merged [`Engine::subscribe`](crate::engine::Engine::subscribe) stream may be `!Send` in the browser; the +//! [`subscribe_resolved_with`] bounds reflect that via the internal `SendUnlessWasmBrowser` trait. Custom +//! [`ResolvedFetcher`] closures still use `Send` futures on all targets (see [`FetchLatestBox`]). + +use std::{ + collections::{HashMap, HashSet}, + future::Future, + pin::Pin, + sync::Arc, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use iroh_blobs::{ + api::{blobs::BlobStatus, Store}, + Hash, +}; +use irpc::channel::mpsc as irpc_mpsc; +use n0_future::Stream; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use crate::{ + actor::SyncHandle, + api::RpcResult, + engine::LiveEvent, + store::{KeyFilter, Query}, + AuthorId, Entry, NamespaceId, SignedEntry, +}; + +// On native targets we require `Send` on the live stream; on `wasm32-unknown-unknown` (see +// `wasm_browser` in `build.rs`), `n0_future::boxed::BoxFuture` is `!Send` (see that crate's +// `boxed` module), matching `Engine::subscribe`'s merged stream. +mod live_stream_bound { + #[cfg(not(wasm_browser))] + pub trait SendUnlessWasmBrowser: Send {} + #[cfg(not(wasm_browser))] + impl SendUnlessWasmBrowser for T {} + + #[cfg(wasm_browser)] + pub trait SendUnlessWasmBrowser {} + #[cfg(wasm_browser)] + impl SendUnlessWasmBrowser for T {} +} + +/// Blob reads used by [`subscribe_resolved_with`] so tests can inject faults without a full store. +#[async_trait] +pub(crate) trait ResolveBlobs: Send + Sync { + async fn blob_status(&self, hash: Hash) -> anyhow::Result; + async fn blob_get_bytes(&self, hash: Hash) -> anyhow::Result; +} + +#[async_trait] +impl ResolveBlobs for iroh_blobs::api::blobs::Blobs { + async fn blob_status(&self, hash: Hash) -> anyhow::Result { + self.status(hash).await.map_err(|e| anyhow::anyhow!(e)) + } + + async fn blob_get_bytes(&self, hash: Hash) -> anyhow::Result { + self.get_bytes(hash).await.map_err(|e| anyhow::anyhow!(e)) + } +} + +/// Boxed future returned by the “latest entry” callback in [`ResolvedFetcher`]. +pub type FetchLatestBox = + Pin>> + Send + 'static>>; +/// Boxed future returned by the “all matching entries” callback in [`ResolvedFetcher`]. +pub type FetchAllBox = Pin>> + Send + 'static>>; + +/// Fetches the latest entry per key (see [`Query::single_latest_per_key`]) for [`subscribe_resolved_with`]. +#[derive(Clone, derive_more::Debug)] +#[debug("ResolvedFetcher")] +pub struct ResolvedFetcher { + latest: Arc FetchLatestBox + Send + Sync>, + all: Arc FetchAllBox + Send + Sync>, +} + +impl ResolvedFetcher { + /// Builds a fetcher backed by the docs actor ([`SyncHandle`]). + pub fn for_sync_engine(sync: SyncHandle, namespace: NamespaceId) -> Self { + let sync_latest = sync.clone(); + let sync_all = sync.clone(); + let latest = Arc::new(move |q: Query| { + let sync = sync_latest.clone(); + Box::pin(async move { fetch_latest_sync(&sync, namespace, q).await }) as FetchLatestBox + }); + let all = Arc::new(move |q: Query| { + let sync = sync_all.clone(); + Box::pin(async move { fetch_all_sync(&sync, namespace, q).await }) as FetchAllBox + }); + Self { latest, all } + } + + /// Custom fetcher (e.g. RPC-backed [`crate::api::Doc`]). + pub fn new( + latest: Arc FetchLatestBox + Send + Sync>, + all: Arc FetchAllBox + Send + Sync>, + ) -> Self { + Self { latest, all } + } + + async fn fetch_latest(&self, q: Query) -> anyhow::Result> { + (self.latest)(q).await + } + + async fn fetch_all(&self, q: Query) -> anyhow::Result> { + (self.all)(q).await + } +} + +/// Latest entry for a key (per [`Query::single_latest_per_key`]) once the blob is locally complete (or the record is empty). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ResolvedKeyValue { + /// Document key. + pub key: Bytes, + /// Latest entry for this key under the subscription scope ([`Query::single_latest_per_key`]). + pub entry: Entry, + /// Raw blob bytes when [`ResolvedSubscribeOpts::include_content`] is true; for empty + /// records this is `None`. + pub content: Option, +} + +/// Options for [`subscribe_resolved_with`]. +#[derive(Debug, Clone, Default)] +pub struct ResolvedSubscribeOpts { + /// Emit the current latest entry for each key in scope before processing live events. + pub initial_snapshot: bool, + /// Read blob bytes into [`ResolvedKeyValue::content`] (requires a complete blob). + pub include_content: bool, + /// Injected delay after each flush (for tests that exercise subscribe backpressure). + pub resolution_delay: Option, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +struct EntryFp { + author: AuthorId, + ts: u64, + hash: Hash, +} + +fn fingerprint(entry: &Entry) -> EntryFp { + EntryFp { + author: entry.author(), + ts: entry.timestamp(), + hash: entry.content_hash(), + } +} + +/// Capacity between the bounded [`crate::engine::Engine::subscribe`] merge stream and the resolver. +const FORWARD_CAP: usize = 2048; + +/// Subscribe to updates for the latest entry for each key matching `scope`, emitting only when that +/// entry’s blob is [`BlobStatus::Complete`] (or the record is empty). +/// +/// `scope` must be built with [`Query::single_latest_per_key`]. Live events are forwarded on a +/// separate task so slow resolution does not fill the bounded replica subscribe channel. +/// +/// The live stream must be [`Send`] on native targets; when building for the browser Wasm target +/// (`wasm_browser`), it may be `!Send` (local boxed futures from blob status checks in the subscribe pipeline). +pub fn subscribe_resolved_with( + live_stream: LS, + fetcher: ResolvedFetcher, + blobs: Store, + scope: Query, + opts: ResolvedSubscribeOpts, +) -> anyhow::Result>> +where + LS: Stream> + + live_stream_bound::SendUnlessWasmBrowser + + 'static, +{ + subscribe_resolved_spawn( + live_stream, + fetcher, + Arc::new(blobs.blobs().clone()), + scope, + opts, + ) +} + +fn subscribe_resolved_spawn( + live_stream: LS, + fetcher: ResolvedFetcher, + blobs: Arc, + scope: Query, + opts: ResolvedSubscribeOpts, +) -> anyhow::Result>> +where + LS: Stream> + + live_stream_bound::SendUnlessWasmBrowser + + 'static, +{ + if !scope.is_single_latest_per_key() { + anyhow::bail!("subscribe_resolved requires Query::single_latest_per_key()"); + } + let (out_tx, out_rx) = mpsc::channel(64); + let (fwd_tx, fwd_rx) = mpsc::channel::>(FORWARD_CAP); + + n0_future::task::spawn(async move { + use n0_future::StreamExt; + let mut live_stream = std::pin::pin!(live_stream); + while let Some(item) = live_stream.next().await { + if fwd_tx.send(item).await.is_err() { + break; + } + } + }); + + n0_future::task::spawn(async move { + if let Err(e) = run_resolver(fwd_rx, out_tx, fetcher, blobs, scope, opts).await { + tracing::warn!("subscribe_resolved resolver ended: {e:#}"); + } + }); + + Ok(ReceiverStream::new(out_rx)) +} + +#[allow(clippy::too_many_arguments)] +async fn handle_live_event( + fetcher: &ResolvedFetcher, + blobs: &dyn ResolveBlobs, + scope: &Query, + opts: &ResolvedSubscribeOpts, + out_tx: &mpsc::Sender>, + pending_blob: &mut HashMap, + last_emitted: &mut HashMap, + dirty_keys: &mut HashSet, + dirty_all: &mut bool, + ev: &LiveEvent, +) -> anyhow::Result<()> { + match ev { + LiveEvent::InsertLocal { entry } | LiveEvent::InsertRemote { entry, .. } => { + let key = entry.key(); + if scope.filter_key().matches(key) { + if matches!(scope.filter_key(), KeyFilter::Any) { + *dirty_all = true; + } else { + dirty_keys.insert(Bytes::copy_from_slice(key)); + } + } + } + LiveEvent::ContentReady { hash } => { + let keys: Vec = pending_blob + .iter() + .filter(|(_, h)| **h == *hash) + .map(|(k, _)| k.clone()) + .collect(); + for k in keys { + try_resolve_key( + fetcher, + blobs, + scope, + opts, + out_tx, + pending_blob, + last_emitted, + k, + ) + .await?; + } + } + LiveEvent::PendingContentReady => { + let keys: Vec = pending_blob.keys().cloned().collect(); + for k in keys { + try_resolve_key( + fetcher, + blobs, + scope, + opts, + out_tx, + pending_blob, + last_emitted, + k, + ) + .await?; + } + } + _ => {} + } + Ok(()) +} + +async fn run_resolver( + mut fwd_rx: mpsc::Receiver>, + out_tx: mpsc::Sender>, + fetcher: ResolvedFetcher, + blobs: Arc, + scope: Query, + opts: ResolvedSubscribeOpts, +) -> anyhow::Result<()> { + let mut dirty_keys: HashSet = HashSet::new(); + let mut dirty_all = false; + let mut pending_blob: HashMap = HashMap::new(); + let mut last_emitted: HashMap = HashMap::new(); + + if opts.initial_snapshot { + let entries = fetcher.fetch_all(scope.clone()).await?; + for e in entries { + try_emit( + blobs.as_ref(), + &scope, + &opts, + &out_tx, + &mut pending_blob, + &mut last_emitted, + e, + ) + .await?; + } + if let Some(d) = opts.resolution_delay { + n0_future::time::sleep(d).await; + } + } + + loop { + let msg = match fwd_rx.recv().await { + Some(m) => m, + None => break, + }; + + match msg { + Err(e) => { + if out_tx.send(Err(e)).await.is_err() { + break; + } + } + Ok(ev) => { + handle_live_event( + &fetcher, + blobs.as_ref(), + &scope, + &opts, + &out_tx, + &mut pending_blob, + &mut last_emitted, + &mut dirty_keys, + &mut dirty_all, + &ev, + ) + .await?; + + while let Ok(more) = fwd_rx.try_recv() { + match more { + Err(e) => { + if out_tx.send(Err(e)).await.is_err() { + return Ok(()); + } + } + Ok(ev2) => { + handle_live_event( + &fetcher, + blobs.as_ref(), + &scope, + &opts, + &out_tx, + &mut pending_blob, + &mut last_emitted, + &mut dirty_keys, + &mut dirty_all, + &ev2, + ) + .await?; + } + } + } + + flush_dirty( + &fetcher, + blobs.as_ref(), + &scope, + &opts, + &out_tx, + &mut dirty_keys, + &mut dirty_all, + &mut pending_blob, + &mut last_emitted, + ) + .await?; + } + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn flush_dirty( + fetcher: &ResolvedFetcher, + blobs: &dyn ResolveBlobs, + scope: &Query, + opts: &ResolvedSubscribeOpts, + out_tx: &mpsc::Sender>, + dirty_keys: &mut HashSet, + dirty_all: &mut bool, + pending_blob: &mut HashMap, + last_emitted: &mut HashMap, +) -> anyhow::Result<()> { + if *dirty_all { + *dirty_all = false; + dirty_keys.clear(); + let entries = fetcher.fetch_all(scope.clone()).await?; + for e in entries { + try_emit(blobs, scope, opts, out_tx, pending_blob, last_emitted, e).await?; + } + } else if !dirty_keys.is_empty() { + let keys: Vec = dirty_keys.drain().collect(); + for key in keys { + try_resolve_key( + fetcher, + blobs, + scope, + opts, + out_tx, + pending_blob, + last_emitted, + key, + ) + .await?; + } + } + + if let Some(d) = opts.resolution_delay { + n0_future::time::sleep(d).await; + } + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn try_resolve_key( + fetcher: &ResolvedFetcher, + blobs: &dyn ResolveBlobs, + scope: &Query, + opts: &ResolvedSubscribeOpts, + out_tx: &mpsc::Sender>, + pending_blob: &mut HashMap, + last_emitted: &mut HashMap, + key: Bytes, +) -> anyhow::Result<()> { + let Some(q) = scope.single_latest_for_exact_key(&key) else { + return Ok(()); + }; + let Some(entry) = fetcher.fetch_latest(q).await? else { + pending_blob.remove(&key); + last_emitted.remove(&key); + return Ok(()); + }; + try_emit( + blobs, + scope, + opts, + out_tx, + pending_blob, + last_emitted, + entry, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn try_emit( + blobs: &dyn ResolveBlobs, + scope: &Query, + opts: &ResolvedSubscribeOpts, + out_tx: &mpsc::Sender>, + pending_blob: &mut HashMap, + last_emitted: &mut HashMap, + entry: Entry, +) -> anyhow::Result<()> { + let key = Bytes::copy_from_slice(entry.key()); + if !scope.filter_key().matches(entry.key()) { + return Ok(()); + } + + let fp = fingerprint(&entry); + if entry.is_empty() { + pending_blob.remove(&key); + if last_emitted.get(&key) == Some(&fp) { + return Ok(()); + } + last_emitted.insert(key.clone(), fp); + let content = if opts.include_content { + Some(Bytes::new()) + } else { + None + }; + if out_tx + .send(Ok(ResolvedKeyValue { + key, + entry, + content, + })) + .await + .is_err() + { + return Ok(()); + } + return Ok(()); + } + + let hash = entry.content_hash(); + match blobs.blob_status(hash).await { + Ok(BlobStatus::Complete { .. }) => {} + Ok(_status) => { + pending_blob.insert(key.clone(), hash); + return Ok(()); + } + Err(e) => { + tracing::debug!( + hash = %hash, + key = ?key, + error = %e, + "subscribe_resolved: blobs_api.status error; treating as not BlobStatus::Complete, pending key" + ); + pending_blob.insert(key.clone(), hash); + return Ok(()); + } + } + + pending_blob.remove(&key); + if last_emitted.get(&key) == Some(&fp) { + return Ok(()); + } + last_emitted.insert(key.clone(), fp); + + let content = if opts.include_content { + match blobs.blob_get_bytes(hash).await { + Ok(b) => Some(b), + Err(e) => { + let _ = out_tx.send(Err(e)).await; + return Ok(()); + } + } + } else { + None + }; + + if out_tx + .send(Ok(ResolvedKeyValue { + key, + entry, + content, + })) + .await + .is_err() + { + return Ok(()); + } + Ok(()) +} + +pub(crate) async fn fetch_latest_sync( + sync: &SyncHandle, + namespace: NamespaceId, + query: Query, +) -> anyhow::Result> { + let (tx, mut rx) = irpc_mpsc::channel::>(64); + sync.get_many(namespace, query, tx).await?; + match rx.recv().await { + Ok(Some(Ok(se))) => Ok(Some(se.into())), + Ok(Some(Err(e))) => Err(anyhow::anyhow!(e)), + Ok(None) => Ok(None), + Err(e) => Err(anyhow::anyhow!(e)), + } +} + +pub(crate) async fn fetch_all_sync( + sync: &SyncHandle, + namespace: NamespaceId, + query: Query, +) -> anyhow::Result> { + let (tx, mut rx) = irpc_mpsc::channel::>(64); + sync.get_many(namespace, query, tx).await?; + let mut v = Vec::new(); + loop { + match rx.recv().await { + Ok(Some(Ok(se))) => v.push(se.into()), + Ok(Some(Err(e))) => return Err(anyhow::anyhow!(e)), + Ok(None) => break, + Err(e) => return Err(anyhow::anyhow!(e)), + } + } + Ok(v) +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use n0_future::{stream, StreamExt}; + + use super::*; + use crate::{ + sync::{Record, RecordIdentifier}, + AuthorId, NamespaceId, + }; + + fn test_entry() -> Entry { + let ns = NamespaceId::from(&[11u8; 32]); + let author = AuthorId::from(&[22u8; 32]); + let id = RecordIdentifier::new(ns, author, b"k"); + let record = Record::new(Hash::new(b"blob-bytes"), 10, 99); + Entry::new(id, record) + } + + fn test_fetcher(entry: Entry) -> ResolvedFetcher { + let e1 = entry.clone(); + let latest: Arc FetchLatestBox + Send + Sync> = Arc::new(move |_q| { + let e = e1.clone(); + Box::pin(async move { Ok(Some(e)) }) as FetchLatestBox + }); + let e2 = entry.clone(); + let all: Arc FetchAllBox + Send + Sync> = Arc::new(move |_q| { + let e = e2.clone(); + Box::pin(async move { Ok(vec![e]) }) as FetchAllBox + }); + ResolvedFetcher::new(latest, all) + } + + /// First `blob_status` fails; later calls succeed (exercises pending + retry path). + struct StatusFailOnceThenComplete { + calls: AtomicUsize, + } + + #[async_trait] + impl ResolveBlobs for StatusFailOnceThenComplete { + async fn blob_status(&self, _hash: Hash) -> anyhow::Result { + let n = self.calls.fetch_add(1, Ordering::SeqCst); + if n == 0 { + Err(anyhow::anyhow!("mock status error")) + } else { + Ok(BlobStatus::Complete { size: 10 }) + } + } + + async fn blob_get_bytes(&self, _hash: Hash) -> anyhow::Result { + Ok(Bytes::from_static(b"blob-bytes")) + } + } + + #[tokio::test] + async fn blob_status_err_then_content_ready_emits_ok() { + let entry = test_entry(); + let hash = entry.content_hash(); + let fetcher = test_fetcher(entry.clone()); + let scope = Query::single_latest_per_key().key_exact(b"k").build(); + let opts = ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }; + // Yield Insert first so flush runs (status fails → pending) before ContentReady is read; + // otherwise try_recv batches both events and pending_blob is still empty for ContentReady. + let entry_a = entry.clone(); + let live = stream::unfold(0u8, move |step| { + let entry = entry_a.clone(); + async move { + match step { + 0 => Some((Ok(LiveEvent::InsertLocal { entry }), 1u8)), + 1 => { + n0_future::time::sleep(std::time::Duration::from_millis(50)).await; + Some((Ok(LiveEvent::ContentReady { hash }), 2u8)) + } + _ => None, + } + } + }); + let mut sub = subscribe_resolved_spawn( + live, + fetcher, + Arc::new(StatusFailOnceThenComplete { + calls: AtomicUsize::new(0), + }), + scope, + opts, + ) + .expect("spawn"); + + let item = tokio::time::timeout(std::time::Duration::from_secs(5), sub.next()) + .await + .expect("wall timeout") + .expect("stream ended"); + let v = item.expect("outer err"); + assert_eq!(v.key.as_ref(), b"k"); + assert_eq!(v.content.as_deref(), Some(&b"blob-bytes"[..])); + } + + struct CompleteThenGetBytesFail; + + #[async_trait] + impl ResolveBlobs for CompleteThenGetBytesFail { + async fn blob_status(&self, _hash: Hash) -> anyhow::Result { + Ok(BlobStatus::Complete { size: 10 }) + } + + async fn blob_get_bytes(&self, _hash: Hash) -> anyhow::Result { + Err(anyhow::anyhow!("mock get_bytes error")) + } + } + + #[tokio::test] + async fn blob_get_bytes_err_surfaces_on_stream() { + let entry = test_entry(); + let fetcher = test_fetcher(entry.clone()); + let scope = Query::single_latest_per_key().key_exact(b"k").build(); + let opts = ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }; + let live = stream::iter([Ok(LiveEvent::InsertLocal { entry })]); + let mut sub = subscribe_resolved_spawn( + live, + fetcher, + Arc::new(CompleteThenGetBytesFail), + scope, + opts, + ) + .expect("spawn"); + + let item = tokio::time::timeout(std::time::Duration::from_secs(5), sub.next()) + .await + .expect("wall timeout") + .expect("stream ended"); + let err = item.expect_err("expected Err from get_bytes"); + assert!(err.to_string().contains("mock get_bytes error"), "{err}"); + } + + #[tokio::test] + async fn blob_status_err_only_no_ok_before_stream_end() { + let entry = test_entry(); + let fetcher = test_fetcher(entry.clone()); + let scope = Query::single_latest_per_key().key_exact(b"k").build(); + let opts = ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }; + let live = stream::iter([Ok(LiveEvent::InsertLocal { entry })]); + let mut sub = subscribe_resolved_spawn( + live, + fetcher, + Arc::new(StatusFailOnceThenComplete { + calls: AtomicUsize::new(0), + }), + scope, + opts, + ) + .expect("spawn"); + + match tokio::time::timeout(std::time::Duration::from_millis(400), sub.next()).await { + Err(_) => {} + Ok(None) => {} + Ok(Some(Ok(_))) => panic!("unexpected Ok(ResolvedKeyValue) while blob status errors"), + Ok(Some(Err(e))) => panic!("unexpected stream err: {e:#}"), + } + } +} diff --git a/tests/subscribe_resolved.rs b/tests/subscribe_resolved.rs new file mode 100644 index 00000000..ea9ddd00 --- /dev/null +++ b/tests/subscribe_resolved.rs @@ -0,0 +1,283 @@ +//! Integration tests for [`iroh_docs::subscribe_resolved`]. + +use std::collections::HashSet; + +use anyhow::Result; +use iroh_docs::{store::Query, subscribe_resolved::ResolvedSubscribeOpts}; +use n0_future::{time::Duration, StreamExt}; +use tracing_test::traced_test; + +mod util; +use util::Node; + +use crate::util::empty_endpoint; + +fn scope_key_exact(key: &[u8]) -> Query { + Query::single_latest_per_key().key_exact(key).build() +} + +fn scope_prefix(prefix: &[u8]) -> Query { + Query::single_latest_per_key().key_prefix(prefix).build() +} + +fn scope_key_exact_include_empty(key: &[u8]) -> Query { + Query::single_latest_per_key() + .key_exact(key) + .include_empty() + .build() +} + +fn scope_whole_replica() -> Query { + Query::single_latest_per_key().build() +} + +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_single_latest_two_authors_local() -> Result<()> { + let ep = empty_endpoint().await?; + let node = Node::memory(ep).spawn().await?; + let client = node.client(); + let author_a = client.docs().author_create().await?; + let author_b = client.docs().author_create().await?; + let doc = client.docs().create().await?; + let blobs = client.blobs(); + + let mut sub = doc + .subscribe_resolved( + blobs, + scope_key_exact(b"k"), + ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }, + ) + .await?; + + doc.set_bytes(author_a, b"k".to_vec(), b"a".to_vec()) + .await?; + let first = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert_eq!(first.content.as_deref(), Some(&b"a"[..])); + + doc.set_bytes(author_b, b"k".to_vec(), b"b".to_vec()) + .await?; + let second = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert_eq!(second.content.as_deref(), Some(&b"b"[..])); + assert_eq!(second.entry.author(), author_b); + + node.shutdown().await?; + Ok(()) +} + +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_include_empty_tombstone_after_del() -> Result<()> { + let ep = empty_endpoint().await?; + let node = Node::memory(ep).spawn().await?; + let client = node.client(); + let author = client.docs().author_create().await?; + let doc = client.docs().create().await?; + let blobs = client.blobs(); + + let mut sub = doc + .subscribe_resolved( + blobs, + scope_key_exact_include_empty(b"k"), + ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }, + ) + .await?; + + doc.set_bytes(author, b"k".to_vec(), b"payload".to_vec()) + .await?; + let first = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert_eq!(first.content.as_deref(), Some(&b"payload"[..])); + assert!(!first.entry.is_empty()); + + doc.del(author, b"k".to_vec()).await?; + let second = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert!(second.entry.is_empty()); + assert_eq!(second.content.as_deref(), Some(&[][..])); + + node.shutdown().await?; + Ok(()) +} + +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_whole_replica_two_keys() -> Result<()> { + let ep = empty_endpoint().await?; + let node = Node::memory(ep).spawn().await?; + let client = node.client(); + let author = client.docs().author_default().await?; + let doc = client.docs().create().await?; + let blobs = client.blobs(); + + let mut sub = doc + .subscribe_resolved( + blobs, + scope_whole_replica(), + ResolvedSubscribeOpts { + include_content: false, + initial_snapshot: false, + resolution_delay: None, + }, + ) + .await?; + + doc.set_bytes(author, b"a".to_vec(), b"1".to_vec()).await?; + doc.set_bytes(author, b"b".to_vec(), b"2".to_vec()).await?; + + let mut seen = HashSet::new(); + for _ in 0..2 { + let up = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert!(seen.insert(up.key.to_vec())); + } + assert_eq!(seen, HashSet::from([b"a".to_vec(), b"b".to_vec()])); + + node.shutdown().await?; + Ok(()) +} + +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_initial_snapshot() -> Result<()> { + let ep = empty_endpoint().await?; + let node = Node::memory(ep).spawn().await?; + let client = node.client(); + let author = client.docs().author_create().await?; + let doc = client.docs().create().await?; + let blobs = client.blobs(); + + doc.set_bytes(author, b"k".to_vec(), b"before_sub".to_vec()) + .await?; + + let mut sub = doc + .subscribe_resolved( + blobs, + scope_key_exact(b"k"), + ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: true, + resolution_delay: None, + }, + ) + .await?; + + let first = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert_eq!(first.content.as_deref(), Some(&b"before_sub"[..])); + + node.shutdown().await?; + Ok(()) +} + +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_author_filter_after_grouping() -> Result<()> { + let ep = empty_endpoint().await?; + let node = Node::memory(ep).spawn().await?; + let client = node.client(); + let author_a = client.docs().author_create().await?; + let author_b = client.docs().author_create().await?; + let doc = client.docs().create().await?; + let blobs = client.blobs(); + + let scope = Query::single_latest_per_key() + .key_exact(b"k") + .author(author_b) + .build(); + + let mut sub = doc + .subscribe_resolved( + blobs, + scope, + ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }, + ) + .await?; + + doc.set_bytes(author_a, b"k".to_vec(), b"from_a".to_vec()) + .await?; + assert!( + tokio::time::timeout(Duration::from_millis(800), sub.next()) + .await + .is_err(), + "author B scope should not emit when only A has written" + ); + + doc.set_bytes(author_b, b"k".to_vec(), b"from_b".to_vec()) + .await?; + let got = tokio::time::timeout(Duration::from_secs(10), sub.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert_eq!(got.entry.author(), author_b); + assert_eq!(got.content.as_deref(), Some(&b"from_b"[..])); + + node.shutdown().await?; + Ok(()) +} + +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_burst_inserts_do_not_deadlock() -> Result<()> { + let ep = empty_endpoint().await?; + let node = Node::memory(ep).spawn().await?; + let client = node.client(); + let author = client.docs().author_default().await?; + let doc = client.docs().create().await?; + let blobs = client.blobs(); + + let _sub = doc + .subscribe_resolved( + blobs, + scope_prefix(b"k"), + ResolvedSubscribeOpts { + include_content: false, + initial_snapshot: false, + resolution_delay: Some(Duration::from_millis(2)), + }, + ) + .await?; + + let n = 800u32; + let insert_fut = async { + for i in 0..n { + let key = format!("k{i}"); + doc.set_bytes(author, key.into_bytes(), format!("v{i}").into_bytes()) + .await?; + } + anyhow::Ok(()) + }; + + tokio::time::timeout(Duration::from_secs(120), insert_fut) + .await + .expect("insert loop should not hang (deadlock)")?; + + node.shutdown().await?; + Ok(()) +} diff --git a/tests/sync.rs b/tests/sync.rs index 6a096c78..9d023f37 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -11,6 +11,7 @@ use iroh_docs::{ }, engine::LiveEvent, store::{DownloadPolicy, FilterKind, Query}, + subscribe_resolved::ResolvedSubscribeOpts, AuthorId, ContentStatus, Entry, }; use n0_future::{ @@ -1376,6 +1377,60 @@ async fn assert_next_unordered_with_optionals( events } +#[tokio::test] +#[traced_test] +async fn subscribe_resolved_single_latest_after_sync_and_blob() -> Result<()> { + use n0_future::StreamExt; + let mut rng = test_rng(b"subscribe_resolved_sync_blob"); + let nodes = spawn_nodes(2, &mut rng).await?; + let clients: Vec<_> = nodes.iter().map(|n| n.client()).collect(); + + let author0 = clients[0].docs().author_create().await?; + let doc0 = clients[0].docs().create().await?; + let blobs0 = clients[0].blobs(); + + let ticket = doc0 + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + + let author1 = clients[1].docs().author_create().await?; + let doc1 = clients[1].docs().import(ticket).await?; + + let mut sub0 = doc0 + .subscribe_resolved( + blobs0, + Query::single_latest_per_key().key_exact(b"shared").build(), + ResolvedSubscribeOpts { + include_content: true, + initial_snapshot: false, + resolution_delay: None, + }, + ) + .await?; + + doc0.set_bytes(author0, b"shared".to_vec(), b"from0".to_vec()) + .await?; + tokio::time::timeout(TIMEOUT, sub0.next()) + .await? + .expect("stream ended") + .expect("outer err"); + + doc1.set_bytes(author1, b"shared".to_vec(), b"from1".to_vec()) + .await?; + + let remote_winner = tokio::time::timeout(TIMEOUT, sub0.next()) + .await? + .expect("stream ended") + .expect("outer err"); + assert_eq!(remote_winner.content.as_deref(), Some(&b"from1"[..])); + assert_eq!(remote_winner.entry.author(), author1); + + for node in nodes { + node.shutdown().await?; + } + Ok(()) +} + /// Asserts that the event is a [`LiveEvent::SyncFinished`] and that the contained [`SyncEvent`] /// has no error and matches `peer` and `namespace`. fn match_sync_finished(event: &LiveEvent, peer: PublicKey) -> bool {