Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust-version = "1.91"
[dependencies]
anyhow = "1"
async-channel = "2.3.1"
async-trait = "0.1"
blake3 = "1.8"
bytes = { version = "1.7", features = ["serde"] }
derive_more = { version = "2.0.1", features = [
Expand Down Expand Up @@ -93,7 +94,7 @@ missing_debug_implementations = "warn"
# require a feature enabled when using `--cfg docsrs` which we can not
# do. To enable for a crate set `#![cfg_attr(iroh_docsrs,
# feature(doc_cfg))]` in the crate.
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] }
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(wasm_browser)"] }

[build-dependencies]
cfg_aliases = "0.2.1"
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ async fn main() -> anyhow::Result<()> {
}
```

The snippet above is **minimal wiring** only (ALPN + router). For an end-to-end sample of the high-level [`Doc`](https://docs.rs/iroh-docs/latest/iroh_docs/api/struct.Doc.html) API—including [`subscribe_resolved`](https://docs.rs/iroh-docs/latest/iroh_docs/api/struct.Doc.html#method.subscribe_resolved) with a key prefix, blob bytes, and `initial_snapshot`—run:

```sh
cargo run --example subscribe_resolved
```

See [`examples/subscribe_resolved.rs`](examples/subscribe_resolved.rs) for the full walkthrough.

# License

Copyright 2026 N0, INC.
Expand Down
99 changes: 99 additions & 0 deletions examples/subscribe_resolved.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Subscribe to the **latest entry per key** under a **key prefix**, including **blob bytes**, on **one stream**.
//!
//! [`iroh_docs::api::Doc::subscribe_resolved`] uses a [`Query::single_latest_per_key`] scope (here
//! [`QueryBuilder::key_prefix`](iroh_docs::store::QueryBuilder::key_prefix)). Each stream item is
//! the current winning entry for some key in range, emitted only once its blob is locally
//! [`BlobStatus::Complete`](iroh_blobs::api::blobs::BlobStatus::Complete) (or the record is empty).
//!
//! With `initial_snapshot: true`, the resolver first emits the latest entry for every matching key
//! that already exists; then it continues with live updates.
//!
//! The `main` body uses the usual pattern for long-lived subscriptions: `while let Some(result) =
//! stream.next().await`, handle each `Ok`/`Err`, and exit when the stream yields `None` (sender
//! dropped or subscription closed). This demo **breaks** after three items so the process can exit:
//! (1) snapshot for an existing key, (2) live insert for a **new** key under the prefix, (3) live
//! update when **`app/foo` is written again**—same key, new latest entry. Omit that `break` in a
//! real app, or drive the loop with `tokio::select!` alongside shutdown.

use anyhow::Result;
use iroh::{endpoint::presets, protocol::Router, Endpoint};
use iroh_blobs::{store::mem::MemStore, BlobsProtocol, ALPN as BLOBS_ALPN};
use iroh_docs::{
protocol::Docs, store::Query, subscribe_resolved::ResolvedSubscribeOpts, ALPN as DOCS_ALPN,
};
use iroh_gossip::{net::Gossip, ALPN as GOSSIP_ALPN};
use n0_future::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
let endpoint = Endpoint::bind(presets::Minimal).await?;

let blobs = MemStore::default();
let blob_store = (*blobs).clone();

let gossip = Gossip::builder().spawn(endpoint.clone());
let docs = Docs::memory()
.spawn(endpoint.clone(), blob_store.clone(), gossip.clone())
.await?;

let router = Router::builder(endpoint.clone())
.accept(BLOBS_ALPN, BlobsProtocol::new(&blobs, None))
.accept(GOSSIP_ALPN, gossip)
.accept(DOCS_ALPN, docs.clone())
.spawn();

let author = docs.author_create().await?;
let doc = docs.create().await?;

doc.set_bytes(author, b"app/foo".to_vec(), b"hello".to_vec())
.await?;

let scope = Query::single_latest_per_key().key_prefix(b"app/").build();

let mut stream = doc
.subscribe_resolved(
&blob_store,
scope,
ResolvedSubscribeOpts {
initial_snapshot: true,
include_content: true,
resolution_delay: None,
},
)
.await?;

let mut updates = 0u32;

while let Some(result) = stream.next().await {
let kv = result?;
updates += 1;
println!(
"update #{updates}: key={:?} content={:?}",
String::from_utf8_lossy(&kv.key),
kv.content.as_deref().map(String::from_utf8_lossy)
);

match updates {
// Snapshot was `app/foo`; add another key under the same prefix.
1 => {
doc.set_bytes(author, b"app/bar".to_vec(), b"world".to_vec())
.await?;
}
// Show that the stream also fires when an existing key gets a new latest entry.
2 => {
doc.set_bytes(author, b"app/foo".to_vec(), b"hello again".to_vec())
.await?;
}
_ => {}
}

if updates >= 3 {
break;
}
}

drop(stream);

router.shutdown().await?;
Ok(())
}
38 changes: 38 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ use crate::{
actor::OpenState,
engine::{Engine, LiveEvent},
store::{DownloadPolicy, Query},
subscribe_resolved::{
subscribe_resolved_with, FetchAllBox, FetchLatestBox, ResolvedFetcher, ResolvedKeyValue,
ResolvedSubscribeOpts,
},
Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
};

Expand Down Expand Up @@ -477,6 +481,40 @@ impl Doc {
})))
}

/// Like [`Engine::subscribe_resolved`](crate::engine::Engine::subscribe_resolved): the latest
/// entry per key for `scope`, emitted only when blobs are complete locally. Requires the same
/// [`iroh_blobs::api::Store`] used by this node for reads.
pub async fn subscribe_resolved(
&self,
blobs: &iroh_blobs::api::Store,
scope: impl Into<Query>,
opts: ResolvedSubscribeOpts,
) -> Result<tokio_stream::wrappers::ReceiverStream<anyhow::Result<ResolvedKeyValue>>> {
self.ensure_open()?;
let scope = scope.into();
let doc_latest = self.clone();
let latest: std::sync::Arc<dyn Fn(Query) -> FetchLatestBox + Send + Sync> =
std::sync::Arc::new(move |q: Query| {
let doc = doc_latest.clone();
Box::pin(async move {
let mut s = n0_future::StreamExt::boxed(doc.get_many(q).await?);
n0_future::TryStreamExt::try_next(&mut s).await
}) as FetchLatestBox
});
let doc_all = self.clone();
let all: std::sync::Arc<dyn Fn(Query) -> FetchAllBox + Send + Sync> =
std::sync::Arc::new(move |q: Query| {
let doc = doc_all.clone();
Box::pin(async move {
let mut s = n0_future::StreamExt::boxed(doc.get_many(q).await?);
n0_future::TryStreamExt::try_collect(&mut s).await
}) as FetchAllBox
});
let fetcher = ResolvedFetcher::new(latest, all);
let live = self.subscribe().await?;
subscribe_resolved_with(live, fetcher, blobs.clone(), scope, opts)
}

/// Returns status info for this document
pub async fn status(&self) -> Result<OpenState> {
self.ensure_open()?;
Expand Down
30 changes: 30 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,36 @@ impl Engine {
Ok(a.or(b))
}

/// Subscribe to updates for the latest entry for each key matching `scope` (per
/// [`crate::store::Query::single_latest_per_key`]), emitting only when that entry’s blob is
/// complete locally (or the record is empty).
///
/// `scope` must be built with [`crate::store::Query::single_latest_per_key`]. See
/// [`crate::subscribe_resolved`] for details.
pub async fn subscribe_resolved(
&self,
namespace: NamespaceId,
scope: crate::store::Query,
opts: crate::subscribe_resolved::ResolvedSubscribeOpts,
) -> anyhow::Result<
tokio_stream::wrappers::ReceiverStream<
anyhow::Result<crate::subscribe_resolved::ResolvedKeyValue>,
>,
> {
let live = self.subscribe(namespace).await?;
let fetcher = crate::subscribe_resolved::ResolvedFetcher::for_sync_engine(
self.sync.clone(),
namespace,
);
crate::subscribe_resolved::subscribe_resolved_with(
live,
fetcher,
self.blob_store().clone(),
scope,
opts,
)
}

/// Handle an incoming iroh-docs connection.
pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
self.to_live_actor
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub mod engine;
pub mod actor;
pub mod api;
pub mod store;
pub mod subscribe_resolved;
pub mod sync;

mod heads;
Expand Down
39 changes: 39 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,30 @@ pub struct Query {
}

impl Query {
/// Returns `true` if this query aggregates with [`SingleLatestPerKeyQuery`].
pub fn is_single_latest_per_key(&self) -> bool {
matches!(self.kind, QueryKind::SingleLatestPerKey(_))
}

/// Returns a copy of this query with an exact key filter and limit `1`, preserving author
/// filter, `include_empty`, and sort direction.
///
/// Returns `None` if this query is not [`SingleLatestPerKeyQuery`].
pub fn single_latest_for_exact_key(&self, key: impl AsRef<[u8]>) -> Option<Query> {
match &self.kind {
QueryKind::SingleLatestPerKey(k) => Some(Query {
kind: QueryKind::SingleLatestPerKey(k.clone()),
filter_author: self.filter_author.clone(),
filter_key: KeyFilter::Exact(Bytes::copy_from_slice(key.as_ref())),
limit: Some(1),
offset: 0,
include_empty: self.include_empty,
sort_direction: self.sort_direction,
}),
_ => None,
}
}

/// Query all records.
pub fn all() -> QueryBuilder<FlatQuery> {
Default::default()
Expand Down Expand Up @@ -324,6 +348,21 @@ impl Query {
pub fn offset(&self) -> u64 {
self.offset
}

/// Key filter applied by this query.
pub fn filter_key(&self) -> &KeyFilter {
&self.filter_key
}

/// Author filter applied by this query.
pub fn filter_author(&self) -> &AuthorFilter {
&self.filter_author
}

/// Whether empty (tombstone) records are included.
pub fn include_empty(&self) -> bool {
self.include_empty
}
}

/// Sort direction
Expand Down
Loading
Loading