diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d4214ab99..f8cab16d48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ - Refactored the remote prover gRPC API implementation to use the new per-method trait implementations ([#1975](https://github.com/0xMiden/node/issues/1975)). - Aligned `SyncNullifiers` list-limit validation in RPC and store with `nullifier_prefix` parameter semantics, extended `GetLimits` test coverage, and documented query parameter limits ([#1986](https://github.com/0xMiden/node/pull/1986)). - Added a `replica` mode to the store, which streams blocks from an upstream master store ([#1987](https://github.com/0xMiden/node/pull/1987)). -- Added `StoreReplica` gRPC service with endpoints for streaming blocks and proofs ([#1987](https://github.com/0xMiden/node/pull/1987)). +- Added public and store `Rpc` streaming endpoints for replica block/proof synchronization ([#1987](https://github.com/0xMiden/node/pull/1987)). - Replaced the network monitor's JavaScript dashboard with a server-rendered Maud + HTMX frontend ([#2024](https://github.com/0xMiden/node/pull/2024)). - [BREAKING] Removed `CheckNullifiers` endpoint ([#2049](https://github.com/0xMiden/node/pull/2049)). - Replaced blocking-in-async operations in the validator, remote prover, and ntx-builder with `spawn_blocking` to avoid starving the Tokio runtime ([#2041](https://github.com/0xMiden/node/pull/2041)). diff --git a/bin/node/src/commands/store.rs b/bin/node/src/commands/store.rs index 792d79006a..22768bf28f 100644 --- a/bin/node/src/commands/store.rs +++ b/bin/node/src/commands/store.rs @@ -99,15 +99,15 @@ pub enum StoreCommand { /// Starts the store in replica mode. /// - /// In this mode the store syncs blocks from an upstream store's `StoreReplica` gRPC service. - /// Only the `Rpc` and `StoreReplica` gRPC services are exposed — the `BlockProducer` and - /// `NtxBuilder` services are not started and no proof scheduler runs. + /// In this mode the store syncs blocks from an upstream store's `Rpc` gRPC service. + /// Only the `Rpc` gRPC service is exposed — the `BlockProducer` and `NtxBuilder` services are + /// not started and no proof scheduler runs. StartReplica { /// Socket address at which to serve the store's RPC API. #[arg(long = "rpc.listen", env = ENV_RPC_LISTEN, value_name = "LISTEN")] rpc_listen: SocketAddr, - /// gRPC URL of the upstream store's `StoreReplica` endpoint to sync blocks from. + /// gRPC URL of the upstream store's `Rpc` endpoint to sync blocks from. #[arg(long = "upstream-store.url", env = ENV_UPSTREAM_URL, value_name = "URL")] upstream_store_url: Url, diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index ed2d5b7b2f..2aa5ea9cc1 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -244,6 +244,9 @@ impl RpcService { #[tonic::async_trait] impl api_server::Api for RpcService { + type BlockSubscriptionStream = tonic::codec::Streaming; + type ProofSubscriptionStream = tonic::codec::Streaming; + // -- Nullifier endpoints ----------------------------------------------------------------- async fn sync_nullifiers( @@ -305,6 +308,30 @@ impl api_server::Api for RpcService { self.store.clone().sync_chain_mmr(request).await } + async fn block_subscription( + &self, + request: Request, + ) -> Result, Status> { + let request_ref = request.get_ref(); + Span::current().set_attribute("block.from", request_ref.block_from); + + debug!(target: COMPONENT, request = ?request_ref); + + self.store.clone().block_subscription(request).await + } + + async fn proof_subscription( + &self, + request: Request, + ) -> Result, Status> { + let request_ref = request.get_ref(); + Span::current().set_attribute("block.from", request_ref.block_from); + + debug!(target: COMPONENT, request = ?request_ref); + + self.store.clone().proof_subscription(request).await + } + // -- Note endpoints ---------------------------------------------------------------------- async fn sync_notes( diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 616f0de309..c81e860e51 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -61,10 +61,10 @@ pub enum StoreMode { max_concurrent_proofs: NonZeroUsize, }, - /// Receives blocks from an upstream store's `StoreReplica` gRPC service. + /// Receives blocks from an upstream store's `Rpc` gRPC service. /// - /// Only the `Rpc` and `StoreReplica` gRPC services are exposed. The `BlockProducer` and - /// `NtxBuilder` services are not started and no proof scheduler runs. + /// Only the `Rpc` gRPC service is exposed. The `BlockProducer` and `NtxBuilder` services are + /// not started and no proof scheduler runs. Replica { upstream_url: Url }, } @@ -310,7 +310,7 @@ impl Store { /// Spawns the gRPC servers for block-producer mode. /// - /// Starts three listeners: Rpc+StoreReplica (shared), `NtxBuilder`, and `BlockProducer`. + /// Starts three listeners: `Rpc`, `NtxBuilder`, and `BlockProducer`. fn spawn_block_producer_grpc_servers( store_api: api::StoreApi, block_producer_api: block_producer::BlockProducerApi, @@ -322,8 +322,6 @@ impl Store { let mut join_set = JoinSet::new(); let rpc_service = store::rpc_server::RpcServer::new(store_api.clone()); - let replica_service = - store::store_replica_server::StoreReplicaServer::new(store_api.clone()); let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(store_api); let block_producer_service = store::block_producer_server::BlockProducerServer::new(block_producer_api); @@ -343,7 +341,6 @@ impl Store { join_set.spawn( make_server() .add_service(rpc_service) - .add_service(replica_service) .add_service(reflection_service.clone()) .serve_with_incoming(TcpListenerStream::new(rpc_listener)), ); @@ -368,8 +365,7 @@ impl Store { /// Spawns the gRPC servers for replica mode. /// - /// Only the Rpc and `StoreReplica` services are exposed — no `BlockProducer`, `NtxBuilder`, or - /// proof scheduler. + /// Only the `Rpc` service is exposed — no `BlockProducer`, `NtxBuilder`, or proof scheduler. fn spawn_replica_grpc_servers( store_api: api::StoreApi, grpc_options: GrpcOptionsInternal, @@ -377,8 +373,7 @@ impl Store { ) -> anyhow::Result>> { let mut join_set = JoinSet::new(); - let rpc_service = store::rpc_server::RpcServer::new(store_api.clone()); - let replica_service = store::store_replica_server::StoreReplicaServer::new(store_api); + let rpc_service = store::rpc_server::RpcServer::new(store_api); let reflection_service = tonic_reflection::server::Builder::configure() .register_file_descriptor_set(store_api_descriptor()) @@ -391,7 +386,6 @@ impl Store { .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(rpc_service) - .add_service(replica_service) .add_service(reflection_service) .serve_with_incoming(TcpListenerStream::new(rpc_listener)), ); diff --git a/crates/store/src/server/replica.rs b/crates/store/src/server/replica.rs index 5f0162262b..5349741903 100644 --- a/crates/store/src/server/replica.rs +++ b/crates/store/src/server/replica.rs @@ -2,22 +2,15 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use miden_node_proto::generated::store::{ - BlockProof, - BlockSubscriptionRequest, - ProofSubscriptionRequest, - SignedBlock, - store_replica_server, -}; +use miden_node_proto::generated::rpc::{BlockSubscriptionResponse, ProofSubscriptionResponse}; use miden_node_utils::ErrorReport; use miden_protocol::block::BlockNumber; use pin_project::pin_project; use tokio::sync::{OwnedSemaphorePermit, mpsc, watch}; use tokio_stream::Stream; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; +use tonic::Status; -use crate::server::api::StoreApi; use crate::state::{BlockCache, ProofCache, State}; // GUARDED STREAM @@ -25,12 +18,18 @@ use crate::state::{BlockCache, ProofCache, State}; /// Wraps a stream and holds a semaphore permit for its lifetime, releasing it on drop. #[pin_project] -struct GuardedStream { +pub(super) struct GuardedStream { #[pin] inner: S, _permit: OwnedSemaphorePermit, } +impl GuardedStream { + pub(super) fn new(inner: S, permit: OwnedSemaphorePermit) -> Self { + Self { inner, _permit: permit } + } +} + impl Stream for GuardedStream { type Item = S::Item; @@ -39,87 +38,35 @@ impl Stream for GuardedStream { } } -// STORE REPLICA API +// RPC SUBSCRIPTION API // ================================================================================================ -#[tonic::async_trait] -impl store_replica_server::StoreReplica for StoreApi { - type BlockSubscriptionStream = Pin< - Box< - dyn tonic::codegen::tokio_stream::Stream> - + Send - + 'static, - >, - >; - - type ProofSubscriptionStream = Pin< - Box< - dyn tonic::codegen::tokio_stream::Stream> - + Send - + 'static, - >, - >; - - /// Streams committed blocks to a replica starting from `from_block_number`. - /// - /// Subscribes to the committed-tip watch channel and maintains a sequential counter. On each - /// tip advance it emits all blocks from the current position up to the new tip, falling back to - /// the block store for any entry not in the in-memory cache. The stream closes only when the - /// client disconnects or the server shuts down. - async fn block_subscription( - &self, - request: Request, - ) -> Result, Status> { - let permit = Arc::clone(&self.block_subscription_semaphore) - .try_acquire_owned() - .map_err(|_| Status::resource_exhausted("maximum block subscriptions reached"))?; - - let from = BlockNumber::from(request.into_inner().block_from); - - let stream = build_block_stream( - from, - self.block_cache.clone(), - self.committed_tip_rx.clone(), - Arc::clone(&self.state), - ); - Ok(Response::new(Box::pin(GuardedStream { inner: stream, _permit: permit }))) - } - - /// Streams block proofs to a replica starting from `from_block_number`. - /// - /// Uses the same watch-channel approach as [`Self::block_subscription`]: waits for the - /// proven-in-sequence tip to advance, then emits all proofs from the current position up to - /// the new tip, falling back to the block store for cache misses. - async fn proof_subscription( - &self, - request: Request, - ) -> Result, Status> { - let permit = Arc::clone(&self.proof_subscription_semaphore) - .try_acquire_owned() - .map_err(|_| Status::resource_exhausted("maximum proof subscriptions reached"))?; - - let from = BlockNumber::from(request.into_inner().block_from); - - let stream = build_proof_stream( - from, - self.proof_cache.clone(), - self.proven_tip_rx.clone(), - Arc::clone(&self.state), - ); - Ok(Response::new(Box::pin(GuardedStream { inner: stream, _permit: permit }))) - } -} +pub(super) type BlockSubscriptionStream = Pin< + Box< + dyn tonic::codegen::tokio_stream::Stream> + + Send + + 'static, + >, +>; + +pub(super) type ProofSubscriptionStream = Pin< + Box< + dyn tonic::codegen::tokio_stream::Stream> + + Send + + 'static, + >, +>; // STREAM BUILDERS // ================================================================================================ /// Spawns the block-stream task and returns its output as a [`ReceiverStream`]. -fn build_block_stream( +pub(super) fn build_block_stream( from: BlockNumber, cache: BlockCache, tip_rx: watch::Receiver, state: Arc, -) -> impl Stream> + Send + 'static { +) -> impl Stream> + Send + 'static { let (tx, rx) = mpsc::channel(32); tokio::spawn(async move { if let Err(status) = run_block_stream(from, cache, tip_rx, state, &tx).await { @@ -131,12 +78,12 @@ fn build_block_stream( } /// Spawns the proof-stream task and returns its output as a [`ReceiverStream`]. -fn build_proof_stream( +pub(super) fn build_proof_stream( from: BlockNumber, cache: ProofCache, tip_rx: watch::Receiver, state: Arc, -) -> impl Stream> + Send + 'static { +) -> impl Stream> + Send + 'static { let (tx, rx) = mpsc::channel(32); tokio::spawn(async move { if let Err(status) = run_proof_stream(from, cache, tip_rx, state, &tx).await { @@ -159,15 +106,22 @@ async fn run_block_stream( cache: BlockCache, mut tip_rx: watch::Receiver, state: Arc, - tx: &mpsc::Sender>, + tx: &mpsc::Sender>, ) -> Result<(), Status> { let mut next = from; loop { - // Read tip. - let tip = *tip_rx.borrow_and_update(); + let mut tip = *tip_rx.borrow_and_update(); while next <= tip { let bytes = fetch_block(next, &cache, &state).await?; - if tx.send(Ok(SignedBlock { block: bytes })).await.is_err() { + tip = *tip_rx.borrow_and_update(); + if tx + .send(Ok(BlockSubscriptionResponse { + block: bytes, + committed_chain_tip: tip.as_u32(), + })) + .await + .is_err() + { // Client disconnected. return Ok(()); } @@ -190,15 +144,23 @@ async fn run_proof_stream( cache: ProofCache, mut tip_rx: watch::Receiver, state: Arc, - tx: &mpsc::Sender>, + tx: &mpsc::Sender>, ) -> Result<(), Status> { let mut next = from; loop { - // Read tip. - let tip = *tip_rx.borrow_and_update(); + let mut tip = *tip_rx.borrow_and_update(); while next <= tip { let proof = fetch_proof(next, &cache, &state).await?; - if tx.send(Ok(BlockProof { block_num: next.as_u32(), proof })).await.is_err() { + tip = *tip_rx.borrow_and_update(); + if tx + .send(Ok(ProofSubscriptionResponse { + block_num: next.as_u32(), + proof, + proven_chain_tip: tip.as_u32(), + })) + .await + .is_err() + { // Client disconnected. return Ok(()); } diff --git a/crates/store/src/server/replica_sync.rs b/crates/store/src/server/replica_sync.rs index 1f8ee54ba1..e60eaa156f 100644 --- a/crates/store/src/server/replica_sync.rs +++ b/crates/store/src/server/replica_sync.rs @@ -4,11 +4,8 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; use miden_crypto::utils::Deserializable; -use miden_node_proto::generated::store::{ - BlockSubscriptionRequest, - ProofSubscriptionRequest, - store_replica_client, -}; +use miden_node_proto::generated::rpc::{BlockSubscriptionRequest, ProofSubscriptionRequest}; +use miden_node_proto::generated::store::rpc_client; use miden_protocol::block::{BlockNumber, SignedBlock}; use tokio_stream::StreamExt; use tracing::{info, warn}; @@ -18,7 +15,7 @@ use crate::state::{Finality, State}; pub(crate) const RECONNECT_DELAY: Duration = Duration::from_secs(5); -type StoreReplicaClient = store_replica_client::StoreReplicaClient; +type StoreRpcClient = rpc_client::RpcClient; // REPLICA SYNC // ================================================================================================ @@ -40,7 +37,7 @@ pub(crate) trait ReplicaSync: Sized + Send + Sync + 'static { /// Subscribes to the upstream stream via `client` and processes events until the stream ends or /// an error occurs. - async fn subscribe(&self, client: StoreReplicaClient) -> anyhow::Result<()>; + async fn subscribe(&self, client: StoreRpcClient) -> anyhow::Result<()>; /// Opens a connection to [`upstream_url`](Self::upstream_url) and calls /// [`subscribe`](Self::subscribe) with the resulting client. @@ -48,7 +45,7 @@ pub(crate) trait ReplicaSync: Sized + Send + Sync + 'static { let channel = tonic::transport::Channel::from_shared(self.upstream_url().to_string())? .connect() .await?; - self.subscribe(StoreReplicaClient::new(channel)).await + self.subscribe(StoreRpcClient::new(channel)).await } /// Runs [`sync`](Self::sync) in an infinite loop, sleeping [`RECONNECT_DELAY`] on failure. @@ -98,7 +95,7 @@ impl ReplicaSync for BlockReplicaSync { &self.upstream_url } - async fn subscribe(&self, mut client: StoreReplicaClient) -> anyhow::Result<()> { + async fn subscribe(&self, mut client: StoreRpcClient) -> anyhow::Result<()> { let block_from = self.state.chain_tip(Finality::Committed).await.child().as_u32(); info!(block_from, upstream_url = %self.upstream_url, "Connecting to upstream store for blocks"); @@ -141,7 +138,7 @@ impl ReplicaSync for ProofReplicaSync { &self.upstream_url } - async fn subscribe(&self, mut client: StoreReplicaClient) -> anyhow::Result<()> { + async fn subscribe(&self, mut client: StoreRpcClient) -> anyhow::Result<()> { let block_from = self.state.chain_tip(Finality::Proven).await.as_u32().saturating_add(1); info!(block_from, upstream_url = %self.upstream_url, "Connecting to upstream store for proofs"); diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index 99158a1ece..bab6c05216 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use miden_node_proto::decode::{ convert_digests_to_words, read_account_id, @@ -35,6 +37,13 @@ use crate::errors::{ SyncTransactionsError, }; use crate::server::api::{StoreApi, internal_error}; +use crate::server::replica::{ + BlockSubscriptionStream, + GuardedStream, + ProofSubscriptionStream, + build_block_stream, + build_proof_stream, +}; use crate::state::Finality; // CLIENT ENDPOINTS @@ -42,6 +51,49 @@ use crate::state::Finality; #[tonic::async_trait] impl rpc_server::Rpc for StoreApi { + type BlockSubscriptionStream = BlockSubscriptionStream; + type ProofSubscriptionStream = ProofSubscriptionStream; + + async fn block_subscription( + &self, + request: Request, + ) -> Result, Status> { + let permit = Arc::clone(&self.block_subscription_semaphore) + .try_acquire_owned() + .map_err(|_| Status::resource_exhausted("maximum block subscriptions reached"))?; + + let from = BlockNumber::from(request.into_inner().block_from); + + let stream = build_block_stream( + from, + self.block_cache.clone(), + self.committed_tip_rx.clone(), + Arc::clone(&self.state), + ); + let stream: Self::BlockSubscriptionStream = Box::pin(GuardedStream::new(stream, permit)); + Ok(Response::new(stream)) + } + + async fn proof_subscription( + &self, + request: Request, + ) -> Result, Status> { + let permit = Arc::clone(&self.proof_subscription_semaphore) + .try_acquire_owned() + .map_err(|_| Status::resource_exhausted("maximum proof subscriptions reached"))?; + + let from = BlockNumber::from(request.into_inner().block_from); + + let stream = build_proof_stream( + from, + self.proof_cache.clone(), + self.proven_tip_rx.clone(), + Arc::clone(&self.state), + ); + let stream: Self::ProofSubscriptionStream = Box::pin(GuardedStream::new(stream, permit)); + Ok(Response::new(stream)) + } + /// Returns block header for the specified block number. /// /// If the block number is not provided, block header for the latest block is returned. diff --git a/docs/external/src/rpc.md b/docs/external/src/rpc.md index 186870da06..c530bd96f2 100644 --- a/docs/external/src/rpc.md +++ b/docs/external/src/rpc.md @@ -18,6 +18,8 @@ The gRPC service definition can be found in the Miden node's `proto` [directory] - [GetNetworkNoteStatus](#getnetworknotestatus) - [GetNotesById](#getnotesbyid) - [GetNoteScriptByRoot](#getnotescriptbyroot) +- [BlockSubscription](#blocksubscription) +- [ProofSubscription](#proofsubscription) - [Status](#status) - [SubmitProvenTx](#submitproventx) - [SyncAccountStorageMaps](#syncaccountstoragemaps) @@ -126,6 +128,20 @@ If the note is not found in the network transaction builder's database, the endp Request the script for a note by its root. +### BlockSubscription + +Streams committed blocks starting from `block_from` inclusive. + +Each stream item contains the serialized signed block and the committed chain tip observed when +the item was emitted. + +### ProofSubscription + +Streams block proofs starting from `block_from` inclusive. + +Each stream item contains the block number, serialized block proof, and the proven chain tip +observed when the item was emitted. + ### Status Request the status of the node components. The response contains the current version of the RPC component and the connection status of the other components, including their versions and the number of the most recent block in the chain (chain tip). diff --git a/proto/proto/internal/store.proto b/proto/proto/internal/store.proto index 04fb41bb25..002b853d82 100644 --- a/proto/proto/internal/store.proto +++ b/proto/proto/internal/store.proto @@ -55,6 +55,19 @@ service Rpc { // Returns transactions records for specific accounts within a block range. rpc SyncTransactions(rpc.SyncTransactionsRequest) returns (rpc.SyncTransactionsResponse) {} + + // Streams committed blocks starting from the given block number (inclusive). + // + // Replays historical blocks first, then streams live blocks as they are committed. + // On lag (replica falls too far behind), the stream is closed with a DATA_LOSS error and + // the client should reconnect from its local tip. + rpc BlockSubscription(rpc.BlockSubscriptionRequest) returns (stream rpc.BlockSubscriptionResponse) {} + + // Streams block proofs starting from the given block number (inclusive). + // + // Replays existing proofs first, then streams new proofs as they are generated. + // On lag, the stream is closed with a DATA_LOSS error. + rpc ProofSubscription(rpc.ProofSubscriptionRequest) returns (stream rpc.ProofSubscriptionResponse) {} } // BLOCK PRODUCER STORE API @@ -223,65 +236,6 @@ message TransactionInputs { optional bool new_account_id_prefix_is_unique = 5; // TODO: Replace this with an error. When a general error message exists. } -// STORE REPLICA API -// ================================================================================================ - -// Store API for replica synchronization. -// -// Any store instance exposes this service so that replicas can subscribe to a continuous stream -// of committed blocks and block proofs. Replicas connect, specify their current local tip (or 0 -// for genesis), receive historical blocks first (catch-up), and then receive live blocks as they -// are committed. -service StoreReplica { - // Streams committed blocks starting from the given block number (inclusive). - // - // Replays historical blocks first, then streams live blocks as they are committed. - // On lag (replica falls too far behind), the stream is closed with a DATA_LOSS error and - // the client should reconnect from its local tip. - rpc BlockSubscription(BlockSubscriptionRequest) returns (stream SignedBlock) {} - - // Streams block proofs starting from the given block number (inclusive). - // - // Replays existing proofs first, then streams new proofs as they are generated. - // On lag, the stream is closed with a DATA_LOSS error. - rpc ProofSubscription(ProofSubscriptionRequest) returns (stream BlockProof) {} -} - -// BLOCK SUBSCRIPTION -// ================================================================================================ - -// Request to subscribe to the committed block stream. -message BlockSubscriptionRequest { - // The block number to start streaming from (inclusive). - fixed32 block_from = 1; -} - -// A committed block streamed to a replica. -message SignedBlock { - // The block encoded using [miden_serde_utils::Serializable] implementation for - // [miden_protocol::block::SignedBlock]. - bytes block = 1; -} - -// PROOF SUBSCRIPTION -// ================================================================================================ - -// Request to subscribe to the block proof stream. -message ProofSubscriptionRequest { - // The block number to start streaming from (inclusive). - fixed32 block_from = 1; -} - -// A block proof streamed to a replica. -message BlockProof { - // The block number this proof corresponds to. - fixed32 block_num = 1; - - // The block proof encoded using [miden_serde_utils::Serializable] implementation for - // [miden_protocol::block::BlockProof]. - bytes proof = 2; -} - // NTX BUILDER STORE API // ================================================================================================ diff --git a/proto/proto/rpc.proto b/proto/proto/rpc.proto index 060818726e..f08f85ca3c 100644 --- a/proto/proto/rpc.proto +++ b/proto/proto/rpc.proto @@ -84,6 +84,19 @@ service Api { // Returns MMR delta needed to synchronize the chain MMR within the requested block range. rpc SyncChainMmr(SyncChainMmrRequest) returns (SyncChainMmrResponse) {} + // Streams committed blocks starting from the given block number (inclusive). + // + // Replays historical blocks first, then streams live blocks as they are committed. + // On lag (replica falls too far behind), the stream is closed with a DATA_LOSS error and + // the client should reconnect from its local tip. + rpc BlockSubscription(BlockSubscriptionRequest) returns (stream BlockSubscriptionResponse) {} + + // Streams block proofs starting from the given block number (inclusive). + // + // Replays existing proofs first, then streams new proofs as they are generated. + // On lag, the stream is closed with a DATA_LOSS error. + rpc ProofSubscription(ProofSubscriptionRequest) returns (stream ProofSubscriptionResponse) {} + // NOTE DEBUGGING ENDPOINTS // -------------------------------------------------------------------------------------------- @@ -97,6 +110,47 @@ service Api { rpc GetNetworkNoteStatus(note.NoteId) returns (GetNetworkNoteStatusResponse) {} } +// BLOCK SUBSCRIPTION +// ================================================================================================ + +// Request to subscribe to the committed block stream. +message BlockSubscriptionRequest { + // The block number to start streaming from (inclusive). + fixed32 block_from = 1; +} + +// A committed block streamed to a replica. +message BlockSubscriptionResponse { + // The block encoded using [miden_serde_utils::Serializable] implementation for + // [miden_protocol::block::SignedBlock]. + bytes block = 1; + + // The committed chain tip when this item was emitted. + fixed32 committed_chain_tip = 2; +} + +// PROOF SUBSCRIPTION +// ================================================================================================ + +// Request to subscribe to the block proof stream. +message ProofSubscriptionRequest { + // The block number to start streaming from (inclusive). + fixed32 block_from = 1; +} + +// A block proof streamed to a replica. +message ProofSubscriptionResponse { + // The block number this proof corresponds to. + fixed32 block_num = 1; + + // The block proof encoded using [miden_serde_utils::Serializable] implementation for + // [miden_protocol::block::BlockProof]. + bytes proof = 2; + + // The proven chain tip when this item was emitted. + fixed32 proven_chain_tip = 3; +} + // RPC STATUS // ================================================================================================