diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 33580fc..6af7f2f 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -67,7 +67,7 @@ Pure CPU, no I/O. Takes raw protobuf bytes, returns structured types. - **Block header verification**: secp256k1 signature recovery, check the recovered witness address against the active SR set, validate the txTrieRoot. **On the dual-engine question**: java-tron has an `SM2` codepath (China's GM/T 0003 national standard) alongside `ECKey`/secp256k1, and from a distance one might assume both are in use. They are not. Investigation 2026-05-09 (java-tron #6588, PR #6627, `MiscConfig.java`): `isECKeyCryptoEngine` is hard-true on mainnet, no mainnet SR signs with SM2, and core devs have proposed deleting the unused module. lightcycle therefore implements only secp256k1 sigverify, and the codec's `WitnessAddressMismatch` branch represents a real bug or an attacker — not a benign engine mismatch. - **Transaction decoding**: TRON has 40+ contract types. The four high-volume ones (`Transfer`, `TransferAsset`/TRC-10, `TriggerSmartContract` — TRC-20/DEX/USDT all flow through here, `CreateSmartContract`) get fully-decoded payload variants on `DecodedContract`; everything else lands in `Other { kind, raw }` with the original `Any.value` bytes preserved so consumers can decode against the matching java-tron protobuf message themselves. - **TransactionInfo decoding**: the side channel java-tron exposes via `GetTransactionInfoByBlockNum` / `GetTransactionInfoById`. The block proto carries the request (signed tx); `TransactionInfo` carries the result (success/fail, energy/bandwidth, emitted logs, internal sub-calls). We surface logs raw, internal txs as `InternalTx`, and the energy/bandwidth breakdown as `ResourceCost` — consumers reconstructing token flow, computing TRX cost, or replaying internal calls don't need a second round-trip per tx. -- **Event log decoding**: the universal token events (`TRC-20 Transfer`/`Approval`, `TRC-721 Transfer`) are recognized by topic-0 hash + topic-count and ship in v0.1 — they cover every standard token contract without any registry. Custom contract events (governance, oracles, anything not following the standard signatures) require an ABI registry and land behind a future `decode_event(log, abi)` entry point. ABI registry shape is pluggable: file-based, HTTP, or on-chain via `getContract`. +- **Event log decoding**: two complementary surfaces. The universal token events (`TRC-20 Transfer`/`Approval`, `TRC-721 Transfer`) ship in `events.rs` and are recognized by topic-0 hash + topic-count alone — every standard token contract decodes without any setup. Arbitrary contract events go through `abi.rs`: operators register Solidity-style signatures (e.g. `"Swap(address indexed sender, uint256 amount0In, uint256 amount1In, uint256 amount0Out, uint256 amount1Out, address indexed to)"`); the registry computes `topic[0] = keccak256(canonical_signature)` and decodes matching logs into a structured `DecodedEvent` with positional + named param access. v0.1 supports the static-type subset (`address`, `bool`, `uintN`/`intN`, `bytesN`); dynamic types (`string`, `bytes`, `T[]`) and tuples surface a typed `AbiParseError::UnsupportedType` so consumers know the boundary. - **Address ergonomics**: `Address::to_base58check` / `from_base58check` for the human-facing `T...` form; raw 21-byte forms remain the wire/storage canonical. ### lightcycle-relayer @@ -83,7 +83,7 @@ The orchestrator. Owns the canonical head pointer and the live block buffer. gRPC server speaking the [`sf.firehose.v2`](https://github.com/streamingfast/proto) protocol used by Substreams. Multiplexes one upstream block stream to many downstream consumers via a `tokio::sync::broadcast` hub — slow subscribers get `RESOURCE_EXHAUSTED` rather than back-pressuring the engine, which is correct semantics for a relayer. -Three Firehose v2 services ship. **`Stream.Blocks`** runs live-tail by default; when an in-memory block cache is attached (the CLI wires one when `--firehose-listen` is set), requests with `start_block_num` or `cursor` walk the cache from the requested height to its tip and then transition into the live broadcast (with dedup so a block emitted from the cache isn't re-emitted from live). Backfill beyond the in-memory cache window requires a persistent block archive — that lands later. `stop_block_num`, `final_blocks_only`, and `transforms` are rejected with `FailedPrecondition`. **`Fetch.Block`** does point-in-time block lookup by `BlockNumber` reference; the service delegates to a `BlockOracle`. The CLI composes a `CachingBlockOracle` (read-through over the same in-memory cache the relayer feeds) wrapping a `GrpcBlockOracle` (dedicated `GrpcSource` connection — separate from the relayer's source so request-driven fetches don't serialize behind the live tail). `BlockHashAndNumber` and `Cursor` references plus `transforms` are rejected with `FailedPrecondition`. **`EndpointInfo.Info`** reports chain identity for orchestrator sanity-check. +Three Firehose v2 services ship. **`Stream.Blocks`** runs live-tail by default; when an in-memory block cache is attached (the CLI wires one when `--firehose-listen` is set), requests with `start_block_num` or `cursor` walk the cache from the requested height to its tip and then transition into the live broadcast (with dedup so a block emitted from the cache isn't re-emitted from live). When a persistent block archive is also attached (`--archive-path`), backfill walks the archive first for any heights below the cache window, then chains into the cache walk — extending resume capability past the in-memory horizon. `stop_block_num`, `final_blocks_only`, and `transforms` are rejected with `FailedPrecondition`. **`Fetch.Block`** does point-in-time block lookup by `BlockNumber` reference; the service delegates to a `BlockOracle`. The CLI composes a `CachingBlockOracle` (read-through over the same in-memory cache the relayer feeds) wrapping a `GrpcBlockOracle` (dedicated `GrpcSource` connection — separate from the relayer's source so request-driven fetches don't serialize behind the live tail); the archive (when configured) is checked first to short-circuit upstream RPC for any height past the cache window. `BlockHashAndNumber` and `Cursor` references plus `transforms` are rejected with `FailedPrecondition`. **`EndpointInfo.Info`** reports chain identity for orchestrator sanity-check. `Response.metadata` is fully populated (num, id, parent_num, parent_id, lib_num=0 for now, time). `Response.block` carries a `google.protobuf.Any` whose `type_url` is `type.googleapis.com/sf.tron.type.v1.Block` and whose value is the prost-encoded `sf.tron.type.v1.Block` — header, transactions, and contract payloads (typed for the four high-volume contract kinds: `Transfer`, `TransferAsset`, `TriggerSmartContract`, `CreateSmartContract`; raw bytes plus wire kind tag for everything else, so consumers can decode locally against java-tron's protobuf for the long-tail governance/admin contracts). @@ -98,8 +98,9 @@ The proto schema lives at `proto/sf/tron/type/v1/block.proto` and is compiled in Local persistence + the consistency-horizon SLO. Per ADR-0021, this crate is constitutionally bound to chain-finality as the only legal cross-replica consistency source — no Raft / Paxos / custom-quorum shims (the chain's SR consensus already solves the verification problem and the Das Sarma round-complexity floor makes any locally-engineered alternative provably worse). - **Consistency-horizon SLO** (landed): `ConsistencyHorizonObserver` records `seen_at` per block id when the relayer first surfaces a `tier=Seen` block, then closes the loop on the `tier=Finalized` transition by observing the elapsed wall-clock time into `lightcycle_store_block_seen_to_finalized_seconds`. **Target: p99 ≤ 5s under healthy operation; alert if >5s sustained for >5 min.** Exported via the standard prometheus exporter (`lightcycle relay --metrics-listen ...`). -- **Block cache** (planned): recent N blocks for reorg replay, in-memory with spill to `redb`. -- **Cursor store** (planned): per-consumer cursor checkpoints (optional, mostly for ops dashboards). +- **Block cache** (landed): bounded in-memory `BlockCache` indexed by both height and block id, generic over `T` so the crate stays free of `lightcycle-relayer` deps. The relayer writes on every `Output::New`/`Output::Undo`; firehose `Fetch.Block` and `Stream.Blocks` backfill read from it. Eviction is "drop the lowest-height entry first" — height-LRU is the right policy because consumer demand drops sharply with block age. +- **Block archive** (landed): redb-backed `BlockArchive` that catches blocks past the chain's solidified-head threshold. Same opaque-bytes API shape as the cursor store; the firehose layer encodes `pb::Block` bytes into it via the archiver task. Stream.Blocks backfill walks archive → cache → live; Fetch.Block hits the archive on cache miss. Operator-facing CLI: `--archive-path` + `--archive-retention-blocks`. Append-only on the happy path (only past-finality blocks land here, by design). +- **Cursor store** (landed): per-consumer cursor checkpoints in a redb-backed map. Per ADR-0021 explicitly **not** a cross-replica primitive — each replica tracks the consumers attached to it. - **SR set checkpoints** (planned): trusted starting point + maintenance-period diffs, so cold restarts don't have to re-derive the whole history. Future cross-replica work must implement `lightcycle_store::ConsistencySource`. The blessed implementation is `FinalityFromChain` (snapshots the relayer's view of the chain's solidified head). Reviewers proposing alternatives should be redirected to ADR-0021. diff --git a/Cargo.lock b/Cargo.lock index d0f3943..a07b103 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,15 +223,6 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bitflags" version = "2.11.1" @@ -1412,6 +1403,7 @@ dependencies = [ "metrics", "prost", "prost-types", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-stream", @@ -1474,7 +1466,6 @@ dependencies = [ name = "lightcycle-store" version = "0.0.1" dependencies = [ - "bincode", "lightcycle-types", "metrics", "redb", diff --git a/Cargo.toml b/Cargo.toml index da96c59..9e216ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,6 @@ thiserror = "1.0" # Serialization serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -bincode = "1.3" hex = "0.4" bs58 = { version = "0.5", default-features = false, features = ["alloc", "check"] } diff --git a/README.md b/README.md index fbd7b1d..92d92b0 100644 --- a/README.md +++ b/README.md @@ -48,11 +48,15 @@ This brings up `java-tron` (chain peer), `lightcycle` (relayer), Prometheus + Gr ## CLI ```bash -# Live ingest pipeline: decode + verify + reorg + serve Firehose v2 over gRPC +# Live ingest pipeline: decode + verify + reorg + serve Firehose v2 over gRPC, +# with persistent block archive + healthcheck for k8s / orchestrator deployment. lightcycle relay \ --grpc-url http://localhost:50051 \ --firehose-listen 0.0.0.0:13042 \ - --metrics-listen 0.0.0.0:9529 + --metrics-listen 0.0.0.0:9529 \ + --health-listen 0.0.0.0:9530 \ + --archive-path /var/lib/lightcycle/blocks.redb \ + --archive-retention-blocks 0 # Lightweight HTTP-RPC head poller (no decode, no firehose): for the # kulen-side comparison dashboard or a Grafana liveness panel. @@ -65,14 +69,23 @@ lightcycle inspect --grpc-url http://localhost:50051 --block 60123456 `relay` is the flagship subcommand. With `--firehose-listen` set, the Firehose v2 server exposes: -- **`Stream.Blocks`** — live tail with optional in-cache backfill via - `start_block_num` or `cursor` (cache window defaults to ~1h of mainnet - blocks; tune with `--block-cache-capacity`). -- **`Fetch.Block`** — point-in-time lookup by height, read-through over - the same in-memory cache the relayer feeds (cache hit short-circuits - the upstream RPC). +- **`Stream.Blocks`** — live tail with backfill via `start_block_num` + or `cursor`. Backfill walks the in-memory `BlockCache` first; with + `--archive-path` set it falls through to the persistent archive, + letting consumers resume past the in-memory window. Tune with + `--block-cache-capacity` (default 1024 ≈ 1h of mainnet at 3s slots) + and `--archive-retention-blocks` (default 0 = keep everything). +- **`Fetch.Block`** — point-in-time lookup by height. Cache hit + short-circuits the upstream RPC; archive hit short-circuits the + upstream RPC for any height past the cache window. - **`EndpointInfo.Info`** — chain identity for orchestrator sanity-check. +`--health-listen` exposes `/healthz` (200 if alive) and `/readyz` +(200 once the relayer has observed the chain's solidified head). Both +are kubelet-probe-compatible. Bound separately from `--metrics-listen` +so a misconfigured Prometheus scrape can't black-hole the readiness +signal. + ## Benchmarking See [`BENCHMARKS.md`](./BENCHMARKS.md) for the methodology, baselines, and harness. Headline expectations on modern hardware: diff --git a/crates/lightcycle-cli/src/health.rs b/crates/lightcycle-cli/src/health.rs new file mode 100644 index 0000000..1c800fe --- /dev/null +++ b/crates/lightcycle-cli/src/health.rs @@ -0,0 +1,294 @@ +//! Tiny HTTP/1.1 health server: `/healthz` (process up) and `/readyz` +//! (relayer is steady-state). Bound separately from the Prometheus +//! exporter so a single misconfigured scrape can't black-hole the +//! readiness signal. +//! +//! ## Why a hand-rolled server +//! +//! The kubelet readiness probe sends `GET /readyz HTTP/1.0\r\n\r\n` and +//! reads the status line. That's the entire protocol surface we need. +//! Pulling in `hyper` or `axum` for two endpoints would balloon the +//! transitive deps for ~100 lines of value. The handler reads exactly +//! the request line, drops the rest, and writes a fixed response — +//! correct enough for any sane probe and trivial to audit. +//! +//! ## Readiness semantics +//! +//! `/readyz` returns 200 iff the relayer has populated the +//! `solidified_head` watch with at least one chain-reported height. +//! That signal subsumes "have we connected to the upstream RPC," +//! "have we successfully fetched at least one block," and "is the +//! finality oracle wired" — all three become true together on the +//! first successful tick. Returns 503 with a short body until then. +//! +//! `/healthz` always returns 200 — the only way to reach it is for +//! the listener to be bound, which means the process is alive. + +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::sync::watch; +use tracing::{debug, warn}; + +use lightcycle_types::BlockHeight; + +/// Bind a healthcheck server on `listen` until `shutdown` resolves. +/// +/// `head_rx` is the chain-reported solidified-head watch — the relayer +/// broadcasts on every successful `WalletSolidity.GetNowBlock` tick. +/// `/readyz` returns 200 once it sees `Some(_)`, 503 before then. +pub(crate) async fn serve( + listen: SocketAddr, + head_rx: watch::Receiver>, + shutdown: impl std::future::Future + Send + 'static, +) -> Result<()> { + let listener = TcpListener::bind(listen) + .await + .with_context(|| format!("bind health server on {listen}"))?; + tracing::info!(%listen, "health server listening"); + metrics::counter!("lightcycle_health_server_starts_total").increment(1); + + let head_rx = Arc::new(head_rx); + tokio::pin!(shutdown); + loop { + tokio::select! { + biased; + _ = &mut shutdown => { + debug!("health server: shutdown received"); + return Ok(()); + } + accept = listener.accept() => { + match accept { + Ok((stream, _peer)) => { + let head_rx = Arc::clone(&head_rx); + tokio::spawn(async move { + if let Err(e) = handle(stream, head_rx).await { + debug!(error = %e, "health request handler error"); + } + }); + } + Err(e) => { + warn!(error = %e, "health server accept failed; continuing"); + } + } + } + } + } +} + +async fn handle( + mut stream: tokio::net::TcpStream, + head_rx: Arc>>, +) -> Result<()> { + // Read enough to parse the request line. Bound the read so a + // malicious slow-loris client can't tie up a task indefinitely. + let mut buf = [0u8; 1024]; + let n = match tokio::time::timeout(std::time::Duration::from_millis(500), stream.read(&mut buf)) + .await + { + Ok(Ok(n)) if n > 0 => n, + _ => { + // Empty or timed-out request; close quietly. + return Ok(()); + } + }; + let request = &buf[..n]; + // Match on the request line only. We don't validate the rest. + let path = parse_path(request); + + let (status_line, body): (&str, &str) = match path { + Some("/healthz") => ("HTTP/1.1 200 OK", "ok\n"), + Some("/readyz") => { + if head_rx.borrow().is_some() { + ("HTTP/1.1 200 OK", "ready\n") + } else { + metrics::counter!("lightcycle_health_readyz_not_ready_total").increment(1); + ( + "HTTP/1.1 503 Service Unavailable", + "not ready: solidified head not yet observed\n", + ) + } + } + _ => ("HTTP/1.1 404 Not Found", "not found\n"), + }; + let response = format!( + "{status_line}\r\n\ + Content-Type: text/plain; charset=utf-8\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n\ + {body}", + body.len(), + ); + stream.write_all(response.as_bytes()).await?; + stream.shutdown().await.ok(); + Ok(()) +} + +/// Extract the URL path from a request like `GET /healthz HTTP/1.1\r\n…`. +/// Returns `None` for requests we can't parse — handler responds 404. +fn parse_path(req: &[u8]) -> Option<&str> { + let line_end = req.iter().position(|&b| b == b'\r' || b == b'\n')?; + let line = std::str::from_utf8(&req[..line_end]).ok()?; + let mut parts = line.splitn(3, ' '); + let method = parts.next()?; + let path = parts.next()?; + // Only GET / HEAD make sense for a probe; anything else gets 404. + if method != "GET" && method != "HEAD" { + return None; + } + // Strip query string if present. + let path = path.split('?').next().unwrap_or(path); + Some(path) +} + +/// Describe the health-server metrics so they show up before the first +/// request. +pub(crate) fn describe_metrics() { + metrics::describe_counter!( + "lightcycle_health_server_starts_total", + "Health-server start events (process restarts)." + ); + metrics::describe_counter!( + "lightcycle_health_readyz_not_ready_total", + "Number of /readyz probes that returned 503 (relayer not yet ready)." + ); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_path_extracts_get_path() { + assert_eq!( + parse_path(b"GET /healthz HTTP/1.1\r\nHost: localhost\r\n\r\n"), + Some("/healthz") + ); + assert_eq!( + parse_path(b"GET /readyz?probe=k8s HTTP/1.0\r\n\r\n"), + Some("/readyz") + ); + assert_eq!( + parse_path(b"HEAD /healthz HTTP/1.1\r\n\r\n"), + Some("/healthz") + ); + } + + #[test] + fn parse_path_rejects_non_get_methods() { + assert_eq!(parse_path(b"POST /healthz HTTP/1.1\r\n\r\n"), None); + assert_eq!(parse_path(b"DELETE / HTTP/1.1\r\n\r\n"), None); + } + + #[test] + fn parse_path_handles_malformed_request() { + assert_eq!(parse_path(b""), None); + assert_eq!(parse_path(b"GET\r\n\r\n"), None); + assert_eq!(parse_path(b"junk"), None); + } + + #[tokio::test] + async fn healthz_returns_200() { + let (_tx, rx) = watch::channel::>(None); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + // Manually accept and handle one connection. + let (server_done_tx, mut server_done_rx) = tokio::sync::mpsc::channel::<()>(1); + let rx_for_handler = Arc::new(rx); + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle(stream, rx_for_handler).await.unwrap(); + let _ = server_done_tx.send(()).await; + }); + + let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); + client + .write_all(b"GET /healthz HTTP/1.0\r\n\r\n") + .await + .unwrap(); + let mut response = Vec::new(); + client.read_to_end(&mut response).await.unwrap(); + let response = String::from_utf8(response).unwrap(); + assert!(response.starts_with("HTTP/1.1 200 OK"), "got: {response}"); + assert!(response.contains("ok")); + let _ = server_done_rx.recv().await; + } + + #[tokio::test] + async fn readyz_returns_503_until_head_seen() { + let (tx, rx) = watch::channel::>(None); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let rx = Arc::new(rx); + + // 1. Before head: /readyz should be 503. + { + let rx_for_handler = Arc::clone(&rx); + let handler = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle(stream, rx_for_handler).await.unwrap(); + }); + let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); + client + .write_all(b"GET /readyz HTTP/1.0\r\n\r\n") + .await + .unwrap(); + let mut response = Vec::new(); + client.read_to_end(&mut response).await.unwrap(); + let response = String::from_utf8(response).unwrap(); + assert!( + response.starts_with("HTTP/1.1 503"), + "expected 503, got: {response}" + ); + handler.await.unwrap(); + } + + // 2. Send a head, re-bind on the same address (the listener was consumed). + tx.send(Some(100)).unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let rx_for_handler = Arc::clone(&rx); + let handler = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle(stream, rx_for_handler).await.unwrap(); + }); + let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); + client + .write_all(b"GET /readyz HTTP/1.0\r\n\r\n") + .await + .unwrap(); + let mut response = Vec::new(); + client.read_to_end(&mut response).await.unwrap(); + let response = String::from_utf8(response).unwrap(); + assert!( + response.starts_with("HTTP/1.1 200 OK"), + "expected 200 after head broadcast, got: {response}" + ); + handler.await.unwrap(); + } + + #[tokio::test] + async fn unknown_path_returns_404() { + let (_tx, rx) = watch::channel::>(None); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let rx_for_handler = Arc::new(rx); + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle(stream, rx_for_handler).await.unwrap(); + }); + let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); + client + .write_all(b"GET /unknown HTTP/1.0\r\n\r\n") + .await + .unwrap(); + let mut response = Vec::new(); + client.read_to_end(&mut response).await.unwrap(); + let response = String::from_utf8(response).unwrap(); + assert!(response.starts_with("HTTP/1.1 404")); + } +} diff --git a/crates/lightcycle-cli/src/main.rs b/crates/lightcycle-cli/src/main.rs index c5edd28..2b48f09 100644 --- a/crates/lightcycle-cli/src/main.rs +++ b/crates/lightcycle-cli/src/main.rs @@ -18,6 +18,8 @@ use lightcycle_source::{GrpcSource, HeadPoller}; use metrics_exporter_prometheus::PrometheusBuilder; use tracing_subscriber::{fmt, EnvFilter}; +mod health; + #[derive(Parser, Debug)] #[command(name = "lightcycle", version, about = "TRON streaming relayer")] struct Cli { @@ -161,6 +163,15 @@ struct RelayArgs { #[arg(long, env = "LIGHTCYCLE_RELAY_FIREHOSE_LISTEN")] firehose_listen: Option, + /// Bind a tiny HTTP healthcheck server here, separate from the + /// metrics exporter. Exposes `/healthz` (200 if process is alive) + /// and `/readyz` (200 once the relayer has seen at least one + /// chain-reported solidified head; 503 before then). Required for + /// kubernetes readiness probes and any orchestrator that gates + /// traffic on health. When unset, no health endpoint is exposed. + #[arg(long, env = "LIGHTCYCLE_RELAY_HEALTH_LISTEN")] + health_listen: Option, + /// Chain identity reported by `EndpointInfo.Info`. Defaults to /// "tron-mainnet" since that's what the LIGHTCYCLE_GRPC_URL /// default targets; override for testnets. @@ -190,6 +201,33 @@ struct RelayArgs { )] block_cache_capacity: usize, + /// Path to the persistent block archive (redb file). When set, the + /// archiver task subscribes to the broadcast and writes every + /// `Output::Irreversible` to disk; Stream.Blocks backfill walks + /// the archive for any height below the in-memory cache window; + /// Fetch.Block hits the archive on cache miss to short-circuit + /// upstream RPC. When unset, lightcycle runs in the v0.x + /// in-memory-only mode (~1h backfill window). + /// + /// File is created on first use; existing data is preserved + /// across restarts. Disk usage is roughly 50 KB × archived + /// blocks; a year of mainnet at 3-second slots fits in ~500 GB. + /// Couple with `--archive-retention-blocks` to bound disk growth. + #[arg(long, env = "LIGHTCYCLE_RELAY_ARCHIVE_PATH")] + archive_path: Option, + + /// Retain only the most recent N blocks in the archive (relative + /// to the chain's current solidified head). 0 means "keep + /// everything" — appropriate for cold-archive deployments. The + /// retention task runs every minute; the archive grows past the + /// floor between runs. Default 0 (no retention). + #[arg( + long, + env = "LIGHTCYCLE_RELAY_ARCHIVE_RETENTION_BLOCKS", + default_value_t = 0 + )] + archive_retention_blocks: u64, + /// Whether to fetch the `TransactionInfo` side channel for each /// block (logs, internal txs, resource accounting). Default true: /// the dominant TRC-20 / TRC-721 indexing use case needs it. Set @@ -349,6 +387,7 @@ async fn run_relay(args: RelayArgs) -> Result<()> { .context("install prometheus exporter")?; lightcycle_relayer::describe_metrics(); lightcycle_store::describe_metrics(); + health::describe_metrics(); metrics::describe_gauge!( "lightcycle_info", "build info: always 1, with version + feature labels" @@ -483,16 +522,65 @@ async fn run_relay(args: RelayArgs) -> Result<()> { let (output_tx, output_rx) = tokio::sync::mpsc::channel(256); let service_handle = tokio::spawn(service.run(output_tx)); + // Spawn the healthcheck server when configured. Reads the same + // solidified-head watch the relayer broadcasts to, so /readyz + // flips to 200 on the first successful tick. Independent of the + // firehose surface — operators running log-only mode still want + // a readiness signal. + let (health_shutdown_tx, health_shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let health_handle = if let Some(listen) = args.health_listen { + let head_rx_for_health = head_rx.clone(); + Some(tokio::spawn(async move { + if let Err(e) = health::serve(listen, head_rx_for_health, async move { + let _ = health_shutdown_rx.await; + }) + .await + { + tracing::error!(error = %e, "health server exited with error"); + } + })) + } else { + // Discard the receiver so the shutdown sender doesn't dangle. + drop(health_shutdown_rx); + None + }; + // Wire the firehose gRPC server when the operator asked for it. // The hub pump task drains output_rx → broadcast; the gRPC server // serves Stream.Blocks subscribers from the broadcast. let firehose_handles = if let Some(listen) = args.firehose_listen { lightcycle_firehose::describe_metrics(); lightcycle_firehose::describe_oracle_metrics(); + lightcycle_firehose::describe_backfill_metrics(); + lightcycle_firehose::describe_archiver_metrics(); lightcycle_store::describe_cache_metrics(); + lightcycle_store::describe_archive_metrics(); let hub = lightcycle_firehose::Hub::new(args.firehose_hub_capacity); let pump_handle = hub.pump_from(output_rx); + // Open the persistent block archive when configured. None + // keeps lightcycle in the v0.x in-memory-only behavior; Some + // wires the archiver task and the read paths below. + let archive: Option = match args.archive_path.as_ref() { + Some(path) => { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create archive directory {parent:?}"))?; + } + } + let archive = lightcycle_store::BlockArchive::open(path) + .with_context(|| format!("open block archive {path:?}"))?; + tracing::info!( + path = %path.display(), + blocks = archive.len().unwrap_or(0), + "block archive opened", + ); + Some(archive) + } + None => None, + }; + // Build the Fetch.Block oracle. Needs its own GrpcSource // connection because the relayer's source is owned by the live- // tail pipeline (Fetch.Block fires on operator request, often @@ -518,6 +606,57 @@ async fn run_relay(args: RelayArgs) -> Result<()> { None => std::sync::Arc::new(upstream_oracle), }; + // Spawn the archiver task before the gRPC server so it's + // already subscribed when the first Output::Irreversible + // arrives. Aborts on shutdown via the broadcast sender being + // dropped. + let archiver_handle = if let Some(arc) = archive.clone() { + let rx = hub.subscribe(); + Some(tokio::spawn(lightcycle_firehose::run_archiver(rx, arc))) + } else { + None + }; + + // Spawn the retention pruner when requested. Reads the + // current head from the watch channel; computes the floor + // height; calls archive.delete_below() once a minute. + let retention_handle = if let (Some(arc), n) = + (archive.clone(), args.archive_retention_blocks) + { + if n > 0 { + let mut head_rx_for_retention = head_rx.clone(); + Some(tokio::spawn(async move { + let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60)); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + ticker.tick().await; // arm + loop { + tokio::select! { + _ = ticker.tick() => { + let head = *head_rx_for_retention.borrow(); + if let Some(h) = head { + let floor = h.saturating_sub(n); + match arc.delete_below(floor) { + Ok(removed) if removed > 0 => { + tracing::info!( + floor, removed, "archive retention pruned" + ); + } + Ok(_) => {} + Err(e) => tracing::warn!(error = %e, "archive prune failed"), + } + } + } + Ok(()) = head_rx_for_retention.changed() => {} + } + } + })) + } else { + None + } + } else { + None + }; + let (server_shutdown_tx, server_shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let chain_name = args.firehose_chain_name.clone(); let hub_for_serve = hub.clone(); @@ -526,6 +665,7 @@ async fn run_relay(args: RelayArgs) -> Result<()> { .map(|cache| lightcycle_firehose::StreamBackfill { cache, solidified_head: head_rx.clone(), + archive: archive.clone(), }); let server_handle = tokio::spawn(async move { if let Err(e) = lightcycle_firehose::serve( @@ -543,7 +683,29 @@ async fn run_relay(args: RelayArgs) -> Result<()> { tracing::error!(error = %e, "firehose server exited with error"); } }); - Some((server_handle, pump_handle, server_shutdown_tx)) + // Stash the archiver / retention handles in the firehose + // tuple so the shutdown branch aborts them with the rest. We + // pack them into a single Vec> to keep the + // shutdown branch's pattern unchanged. + let mut aux_handles: Vec> = Vec::new(); + if let Some(h) = archiver_handle { + aux_handles.push(h); + } + if let Some(h) = retention_handle { + aux_handles.push(h); + } + // Replace the bare pump_handle with a wrapper that also + // awaits the aux handles on shutdown via abort(). + let composite_pump_handle = tokio::spawn(async move { + // The original pump_handle drives indefinitely; we abort + // it from the outer shutdown path. Aux handles are + // aborted here on the outer scope's drop. + let _ = pump_handle.await; + for h in aux_handles { + h.abort(); + } + }); + Some((server_handle, composite_pump_handle, server_shutdown_tx)) } else { // Log-only mode: drain the mpsc into the void so the service // doesn't back-pressure. The service already logs each block; @@ -573,6 +735,14 @@ async fn run_relay(args: RelayArgs) -> Result<()> { let _ = tokio::time::timeout(std::time::Duration::from_secs(3), server_handle).await; let _ = tokio::time::timeout(std::time::Duration::from_secs(1), pump_handle).await; } + if let Some(h) = health_handle { + let _ = health_shutdown_tx.send(()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), h).await; + } else { + // No server to signal; drop the sender so the receiver-half + // we already dropped doesn't cause a clippy warning. + drop(health_shutdown_tx); + } if let Some(h) = sr_refresh_handle { h.abort(); } diff --git a/crates/lightcycle-codec/src/abi.rs b/crates/lightcycle-codec/src/abi.rs new file mode 100644 index 0000000..f1420ad --- /dev/null +++ b/crates/lightcycle-codec/src/abi.rs @@ -0,0 +1,738 @@ +//! Minimal ABI registry + event decoder for arbitrary contract logs. +//! +//! Sits alongside the `events` module (which decodes the universal +//! TRC-20/721 events without a registry, by topic-0 hash). Where the +//! universal decoder stops, this registry takes over: the operator +//! pre-registers an event signature like +//! `"Swap(address indexed sender, uint256 amount0In, uint256 amount1In, +//! uint256 amount0Out, uint256 amount1Out, address indexed to)"` and +//! the registry computes its `topic[0]` hash, then decodes any matching +//! log into a structured [`DecodedEvent`]. +//! +//! ## Scope of v0.1 +//! +//! Supports the **static-type** subset of Solidity event params, which +//! covers the long tail of real-world contract events: +//! +//! - Indexed: `address`, `uint8..=uint256`, `int8..=int256`, `bool`, +//! `bytes32`, smaller `bytesN` (left-padded), other fixed-size types +//! - Non-indexed: same set, encoded as 32-byte words in `data` +//! +//! Out of scope (intentional v0.1 cut): +//! - Dynamic types (`string`, `bytes`, `T[]`) in non-indexed params — +//! these use offset+length encoding which adds substantial parser +//! complexity. The biggest indexer use cases (DEX swaps, lending +//! accruals, governance votes) are static-only. +//! - Tuple types and nested structs. +//! - Anonymous events (no `topic[0]` to match against). +//! +//! When extension lands, the [`AbiType`] enum grows and the decoder +//! gains a non-static decode path. Existing registered signatures stay +//! source-compatible. +//! +//! ## Why hand-rolled +//! +//! `alloy-sol-types` is the gold standard but pulls a large dep tree. +//! `ethabi` is older and unmaintained. The static-types subset is ~250 +//! lines including the parser; building it ourselves keeps the +//! transitive deps of `lightcycle-codec` honest and lets us tailor the +//! error messages to lightcycle's failure modes. + +use std::collections::HashMap; +use std::fmt; + +use sha3::{Digest, Keccak256}; + +use lightcycle_types::Address; + +use crate::tx_info::Log; + +/// Static Solidity types we currently decode. The `usize` bit-width +/// for [`AbiType::Uint`] / [`AbiType::Int`] / [`AbiType::Bytes`] is +/// 1..=32 bytes — we keep the raw 32-byte word in [`AbiValue`] and +/// leave width-aware truncation/sign-extension to the consumer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AbiType { + Address, + Bool, + /// `uintN` for `N` in 8..=256, divisible by 8. Stored as 32-byte + /// big-endian; top `(32 - N/8)` bytes are zero on a well-formed + /// emission. + Uint(u16), + /// `intN` for `N` in 8..=256, divisible by 8. Stored as 32-byte + /// big-endian two's complement. + Int(u16), + /// `bytesN` for `N` in 1..=32. Stored left-justified in a 32-byte + /// word; trailing `(32 - N)` bytes are zero on a well-formed + /// emission. + Bytes(u8), +} + +impl fmt::Display for AbiType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Address => f.write_str("address"), + Self::Bool => f.write_str("bool"), + Self::Uint(n) => write!(f, "uint{n}"), + Self::Int(n) => write!(f, "int{n}"), + Self::Bytes(n) => write!(f, "bytes{n}"), + } + } +} + +/// One decoded ABI value. The 32-byte raw word is preserved for +/// numeric types so consumers can convert to their preferred bigint +/// crate without our enum committing to one. Address values are +/// reconstructed in TRON's 21-byte form (network prefix `0x41` + +/// 20-byte hash). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AbiValue { + Address(Address), + Bool(bool), + /// 32-byte big-endian word for `uintN` / `intN` / `bytes32`. The + /// type tag stays on the originating [`AbiParam`] alongside this + /// value, so the consumer knows whether to interpret as unsigned, + /// two's-complement, or fixed-bytes. + Word([u8; 32]), +} + +/// One declared parameter on an event signature. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AbiParam { + /// Name as written in the registered signature. May be empty if + /// the operator omitted parameter names (e.g. registered as + /// `Swap(address,uint256)`). + pub name: String, + pub ty: AbiType, + pub indexed: bool, +} + +/// One registered event signature, with its derived `topic[0]` hash. +/// Cheap to clone; the hash is precomputed on construction. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EventSignature { + pub name: String, + pub params: Vec, + /// `keccak256(canonical_signature_string)`. Indexed and non-indexed + /// params are NOT distinguished in the canonical string per Solidity + /// ABI spec — the order of types is what matters. + pub topic0: [u8; 32], +} + +impl EventSignature { + /// Parse a Solidity-style event signature into a registry entry. + /// + /// Accepted shapes: + /// - `"Transfer(address,address,uint256)"` (compact, no names) + /// - `"Transfer(address indexed from, address indexed to, uint256 value)"` + /// (full, with names + indexed annotations) + /// - Mixed: `"Swap(address indexed sender, uint256, uint256)"` + /// + /// Whitespace is tolerated. The canonical string for keccak hashing + /// is rebuilt from the parsed types (no names, no indexed + /// annotations) so a spelling like `"Transfer(address ,address, + /// uint256 )"` produces the same `topic0` as the standard form. + pub fn parse(s: &str) -> Result { + let s = s.trim(); + let open = s + .find('(') + .ok_or_else(|| AbiParseError::Malformed("missing '(' in event signature".into()))?; + if !s.ends_with(')') { + return Err(AbiParseError::Malformed( + "event signature must end with ')'".into(), + )); + } + let name = s[..open].trim().to_string(); + if name.is_empty() { + return Err(AbiParseError::Malformed("empty event name".into())); + } + let body = &s[open + 1..s.len() - 1]; + let params = parse_param_list(body)?; + let canonical = canonical_signature(&name, ¶ms); + let topic0_hash = Keccak256::digest(canonical.as_bytes()); + let mut topic0 = [0u8; 32]; + topic0.copy_from_slice(&topic0_hash); + Ok(Self { + name, + params, + topic0, + }) + } +} + +fn parse_param_list(body: &str) -> Result, AbiParseError> { + let body = body.trim(); + if body.is_empty() { + return Ok(Vec::new()); + } + body.split(',') + .map(|tok| parse_one_param(tok.trim())) + .collect() +} + +fn parse_one_param(s: &str) -> Result { + // Accepted forms: + // "address" + // "address sender" + // "address indexed sender" + // "uint256" + let tokens: Vec<&str> = s.split_whitespace().collect(); + let (ty_tok, indexed, name): (&str, bool, String) = match tokens.as_slice() { + [ty] => (ty, false, String::new()), + // The "indexed" branch must precede the bare 2-tuple branch + // — `[ty, "indexed"]` is a strict subset of `[ty, name]`, and + // rust matches the first arm that fits. + [ty, "indexed"] => (ty, true, String::new()), + [ty, name] => (ty, false, (*name).to_string()), + [ty, "indexed", name] => (ty, true, (*name).to_string()), + _ => { + return Err(AbiParseError::Malformed(format!( + "unrecognized parameter syntax: '{s}'" + ))); + } + }; + let ty = parse_type(ty_tok)?; + Ok(AbiParam { name, ty, indexed }) +} + +fn parse_type(s: &str) -> Result { + // Reject array types first; otherwise "uint256[]" parses as a + // bad uint width and falls into UnknownType. Cleaner error. + if s.contains('[') { + return Err(AbiParseError::UnsupportedType(format!( + "array types like '{s}' not supported in v0.1" + ))); + } + if s == "address" { + return Ok(AbiType::Address); + } + if s == "bool" { + return Ok(AbiType::Bool); + } + if let Some(rest) = s.strip_prefix("uint") { + let bits: u16 = if rest.is_empty() { + 256 + } else { + rest.parse() + .map_err(|_| AbiParseError::UnknownType(s.into()))? + }; + if bits == 0 || bits > 256 || !bits.is_multiple_of(8) { + return Err(AbiParseError::UnknownType(s.into())); + } + return Ok(AbiType::Uint(bits)); + } + if let Some(rest) = s.strip_prefix("int") { + let bits: u16 = if rest.is_empty() { + 256 + } else { + rest.parse() + .map_err(|_| AbiParseError::UnknownType(s.into()))? + }; + if bits == 0 || bits > 256 || !bits.is_multiple_of(8) { + return Err(AbiParseError::UnknownType(s.into())); + } + return Ok(AbiType::Int(bits)); + } + if let Some(rest) = s.strip_prefix("bytes") { + // bytes32, bytes16, etc — fixed-size only in v0.1. + // Bare "bytes" (dynamic) is not yet supported; fail clearly. + if rest.is_empty() { + return Err(AbiParseError::UnsupportedType( + "dynamic 'bytes' (non-fixed) not supported in v0.1; use bytesN".into(), + )); + } + let n: u8 = rest + .parse() + .map_err(|_| AbiParseError::UnknownType(s.into()))?; + if n == 0 || n > 32 { + return Err(AbiParseError::UnknownType(s.into())); + } + return Ok(AbiType::Bytes(n)); + } + if s == "string" { + return Err(AbiParseError::UnsupportedType( + "'string' is dynamic; not supported in v0.1".into(), + )); + } + Err(AbiParseError::UnknownType(s.into())) +} + +fn canonical_signature(name: &str, params: &[AbiParam]) -> String { + let mut s = String::with_capacity(name.len() + 2 + params.len() * 8); + s.push_str(name); + s.push('('); + for (i, p) in params.iter().enumerate() { + if i > 0 { + s.push(','); + } + // canonical form ALWAYS uses `uint256`/`int256` (not the bare + // forms). We've already normalized via parse_type, so just + // print our internal Display repr. + s.push_str(&p.ty.to_string()); + } + s.push(')'); + s +} + +/// Errors from parsing event signatures. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum AbiParseError { + #[error("malformed signature: {0}")] + Malformed(String), + #[error("unknown type: {0}")] + UnknownType(String), + #[error("type not supported in v0.1: {0}")] + UnsupportedType(String), +} + +/// Errors from decoding an actual log against a registered signature. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum AbiDecodeError { + #[error("indexed param count mismatch: expected {expected}, got {got}")] + IndexedCountMismatch { expected: usize, got: usize }, + #[error( + "non-indexed data length mismatch: expected {expected} bytes ({words} words × 32), got {got}" + )] + DataLenMismatch { + expected: usize, + got: usize, + words: usize, + }, +} + +/// Output of [`EventRegistry::decode`]: the matched signature plus +/// each parameter paired with its decoded value. Order matches the +/// declared signature order. +#[derive(Debug, Clone)] +pub struct DecodedEvent { + pub name: String, + /// One entry per parameter, in declaration order. The `param` + /// field is the original schema; the `value` is the decoded raw + /// 32-byte word reinterpreted by type. + pub params: Vec<(AbiParam, AbiValue)>, +} + +impl DecodedEvent { + /// Look up a parameter value by name. Returns `None` if no + /// parameter with that name was decoded (either anonymous params, + /// or the consumer's spelling diverges from the registered + /// signature). + pub fn value(&self, name: &str) -> Option<&AbiValue> { + self.params + .iter() + .find(|(p, _)| p.name == name) + .map(|(_, v)| v) + } +} + +/// Map of `topic[0]` hash → registered event signature. One topic hash +/// can collide across two different signatures with the same param +/// types — registry is last-write-wins; operators should refrain from +/// registering colliding signatures with different decode intent. +#[derive(Debug, Default, Clone)] +pub struct EventRegistry { + by_topic0: HashMap<[u8; 32], EventSignature>, +} + +impl EventRegistry { + pub fn new() -> Self { + Self::default() + } + + /// Register one signature. Overwrites any prior registration with + /// the same `topic[0]`. + pub fn register(&mut self, sig: EventSignature) { + self.by_topic0.insert(sig.topic0, sig); + } + + /// Convenience: parse + register in one call. + pub fn register_signature(&mut self, s: &str) -> Result<(), AbiParseError> { + let sig = EventSignature::parse(s)?; + self.register(sig); + Ok(()) + } + + pub fn len(&self) -> usize { + self.by_topic0.len() + } + + pub fn is_empty(&self) -> bool { + self.by_topic0.is_empty() + } + + /// Look up by `topic[0]` without decoding. Useful when the consumer + /// just wants to know the event name. + pub fn signature(&self, topic0: &[u8; 32]) -> Option<&EventSignature> { + self.by_topic0.get(topic0) + } + + /// Decode a log if its `topic[0]` matches a registered signature. + /// Returns: + /// - `Ok(Some(DecodedEvent))` on a clean decode + /// - `Ok(None)` if the log's `topic[0]` isn't registered (not an + /// error — most logs in a real block won't match) + /// - `Err(_)` if the log matches a registered signature but is + /// malformed for that schema (wrong topic count, wrong data len) + pub fn decode(&self, log: &Log) -> Result, AbiDecodeError> { + if log.topics.is_empty() { + return Ok(None); + } + let Some(sig) = self.by_topic0.get(&log.topics[0]) else { + return Ok(None); + }; + decode_with_signature(sig, log).map(Some) + } +} + +fn decode_with_signature(sig: &EventSignature, log: &Log) -> Result { + // Indexed params consume topics[1..]; non-indexed params consume + // 32-byte chunks of `data`. The first topic is the event signature + // hash itself, so the count of indexed params is `topics.len() - 1`. + let expected_indexed = sig.params.iter().filter(|p| p.indexed).count(); + let got_indexed = log.topics.len().saturating_sub(1); + if got_indexed != expected_indexed { + return Err(AbiDecodeError::IndexedCountMismatch { + expected: expected_indexed, + got: got_indexed, + }); + } + + let non_indexed_count = sig.params.len() - expected_indexed; + let expected_data = non_indexed_count.saturating_mul(32); + if log.data.len() != expected_data { + return Err(AbiDecodeError::DataLenMismatch { + expected: expected_data, + got: log.data.len(), + words: non_indexed_count, + }); + } + + let mut topic_cursor: usize = 1; + let mut data_cursor: usize = 0; + let mut out = Vec::with_capacity(sig.params.len()); + for p in &sig.params { + let word = if p.indexed { + let t = log.topics[topic_cursor]; + topic_cursor += 1; + t + } else { + let mut w = [0u8; 32]; + w.copy_from_slice(&log.data[data_cursor..data_cursor + 32]); + data_cursor += 32; + w + }; + let value = match &p.ty { + AbiType::Address => AbiValue::Address(word_to_tron_address(&word)), + AbiType::Bool => AbiValue::Bool(word_to_bool(&word)), + AbiType::Uint(_) | AbiType::Int(_) | AbiType::Bytes(_) => AbiValue::Word(word), + }; + out.push((p.clone(), value)); + } + Ok(DecodedEvent { + name: sig.name.clone(), + params: out, + }) +} + +fn word_to_tron_address(w: &[u8; 32]) -> Address { + // EVM addresses are 20-byte right-aligned in a 32-byte word; TRON + // adds a 1-byte network prefix `0x41` to form the 21-byte form. + let mut a = [0u8; 21]; + a[0] = 0x41; + a[1..].copy_from_slice(&w[12..]); + Address(a) +} + +fn word_to_bool(w: &[u8; 32]) -> bool { + // ABI encodes bool as 0x00..00 (false) or 0x00..01 (true). Geth + // tolerates any non-zero in the LSB; we match that. + w[31] != 0 +} + +#[cfg(test)] +mod tests { + use super::*; + + fn synth_log(topics: Vec<[u8; 32]>, data: Vec) -> Log { + Log { + address: Address([0x41; 21]), + topics, + data, + } + } + + #[test] + fn parse_compact_signature() { + let sig = EventSignature::parse("Transfer(address,address,uint256)").unwrap(); + assert_eq!(sig.name, "Transfer"); + assert_eq!(sig.params.len(), 3); + assert!(sig.params.iter().all(|p| p.name.is_empty() && !p.indexed)); + // Topic0 must match the keccak hash of the canonical form. + let expect = Keccak256::digest(b"Transfer(address,address,uint256)"); + assert_eq!(&sig.topic0[..], &expect[..]); + } + + #[test] + fn parse_full_signature_with_names_and_indexed() { + let sig = EventSignature::parse( + "Transfer(address indexed from, address indexed to, uint256 value)", + ) + .unwrap(); + assert_eq!(sig.name, "Transfer"); + assert_eq!(sig.params.len(), 3); + assert!(sig.params[0].indexed); + assert!(sig.params[1].indexed); + assert!(!sig.params[2].indexed); + assert_eq!(sig.params[0].name, "from"); + assert_eq!(sig.params[1].name, "to"); + assert_eq!(sig.params[2].name, "value"); + // Same canonical string → same topic0 as the compact form. + let compact = EventSignature::parse("Transfer(address,address,uint256)").unwrap(); + assert_eq!(sig.topic0, compact.topic0); + } + + #[test] + fn parse_handles_messy_whitespace() { + let sig = EventSignature::parse(" Swap ( address , uint256 , bool ) ").unwrap(); + assert_eq!(sig.name, "Swap"); + let canon = EventSignature::parse("Swap(address,uint256,bool)").unwrap(); + assert_eq!(sig.topic0, canon.topic0); + } + + #[test] + fn parse_recognizes_widths() { + for ty in ["uint8", "uint16", "uint32", "uint128", "uint256"] { + let s = format!("Foo({ty})"); + let sig = EventSignature::parse(&s).unwrap(); + assert_eq!(sig.params[0].ty.to_string(), ty); + } + for ty in ["int8", "int128", "int256"] { + let s = format!("Foo({ty})"); + let sig = EventSignature::parse(&s).unwrap(); + assert_eq!(sig.params[0].ty.to_string(), ty); + } + } + + #[test] + fn parse_rejects_dynamic_bytes() { + let err = EventSignature::parse("Foo(bytes)").unwrap_err(); + match err { + AbiParseError::UnsupportedType(_) => {} + _ => panic!("expected UnsupportedType"), + } + } + + #[test] + fn parse_rejects_string_in_v01() { + let err = EventSignature::parse("Foo(string)").unwrap_err(); + match err { + AbiParseError::UnsupportedType(_) => {} + _ => panic!("expected UnsupportedType"), + } + } + + #[test] + fn parse_rejects_unknown_type() { + let err = EventSignature::parse("Foo(uint257)").unwrap_err(); + match err { + AbiParseError::UnknownType(_) => {} + other => panic!("expected UnknownType, got {other:?}"), + } + } + + #[test] + fn parse_rejects_array_types() { + let err = EventSignature::parse("Foo(uint256[])").unwrap_err(); + match err { + AbiParseError::UnsupportedType(_) => {} + other => panic!("expected UnsupportedType, got {other:?}"), + } + } + + #[test] + fn parse_empty_param_list() { + let sig = EventSignature::parse("Pause()").unwrap(); + assert_eq!(sig.params.len(), 0); + } + + #[test] + fn registry_decodes_swap_like_event() { + // Uniswap V2 Swap-flavored event with a typical mix. + let mut reg = EventRegistry::new(); + reg.register_signature( + "Swap(address indexed sender, uint256 amount0In, uint256 amount1In, \ + uint256 amount0Out, uint256 amount1Out, address indexed to)", + ) + .unwrap(); + + let sig = EventSignature::parse( + "Swap(address indexed sender, uint256 amount0In, uint256 amount1In, \ + uint256 amount0Out, uint256 amount1Out, address indexed to)", + ) + .unwrap(); + + // Build a synthetic log with topic0 + 2 indexed addresses + 4 + // non-indexed uint256 in data. + let mut sender = [0u8; 32]; + sender[12..].copy_from_slice(&[0x11; 20]); + let mut to = [0u8; 32]; + to[12..].copy_from_slice(&[0x22; 20]); + let topics = vec![sig.topic0, sender, to]; + let mut data = Vec::with_capacity(32 * 4); + for v in [100u64, 0, 0, 200u64] { + let mut w = [0u8; 32]; + w[24..].copy_from_slice(&v.to_be_bytes()); + data.extend_from_slice(&w); + } + + let log = synth_log(topics, data); + let decoded = reg.decode(&log).unwrap().expect("matched"); + assert_eq!(decoded.name, "Swap"); + assert_eq!(decoded.params.len(), 6); + + // Sender (indexed, address) should round-trip to TRON form. + let sender_v = decoded.value("sender").unwrap(); + let AbiValue::Address(a) = sender_v else { + panic!("expected Address") + }; + assert_eq!(a.0[0], 0x41); + assert_eq!(&a.0[1..], &[0x11; 20]); + + // amount0In should be 100. + let amount0_in = decoded.value("amount0In").unwrap(); + let AbiValue::Word(w) = amount0_in else { + panic!("expected Word") + }; + let mut expect = [0u8; 32]; + expect[24..].copy_from_slice(&100u64.to_be_bytes()); + assert_eq!(w, &expect); + } + + #[test] + fn decode_returns_none_for_unregistered_event() { + let reg = EventRegistry::new(); + let log = synth_log(vec![[0xab; 32]], vec![]); + assert!(reg.decode(&log).unwrap().is_none()); + } + + #[test] + fn decode_errors_on_indexed_count_mismatch() { + let mut reg = EventRegistry::new(); + reg.register_signature("Move(address indexed who, uint256 amount)") + .unwrap(); + let sig = EventSignature::parse("Move(address indexed who, uint256 amount)").unwrap(); + // Send only topic0 — missing the indexed `who` topic. + let log = synth_log(vec![sig.topic0], vec![0u8; 32]); + let err = reg.decode(&log).unwrap_err(); + match err { + AbiDecodeError::IndexedCountMismatch { expected, got } => { + assert_eq!(expected, 1); + assert_eq!(got, 0); + } + _ => panic!("expected IndexedCountMismatch"), + } + } + + #[test] + fn decode_errors_on_data_len_mismatch() { + let mut reg = EventRegistry::new(); + reg.register_signature("Move(uint256 amount, uint256 fee)") + .unwrap(); + let sig = EventSignature::parse("Move(uint256 amount, uint256 fee)").unwrap(); + // 2 non-indexed params expect 64 bytes; we send 32. + let log = synth_log(vec![sig.topic0], vec![0u8; 32]); + let err = reg.decode(&log).unwrap_err(); + match err { + AbiDecodeError::DataLenMismatch { + expected, + got, + words, + } => { + assert_eq!(expected, 64); + assert_eq!(got, 32); + assert_eq!(words, 2); + } + _ => panic!("expected DataLenMismatch"), + } + } + + #[test] + fn decode_handles_anonymous_param_names() { + // Operator omits names entirely — decode still works, value() + // lookup just returns None for any name (params are + // positional). + let mut reg = EventRegistry::new(); + reg.register_signature("Swap(uint256,uint256)").unwrap(); + let sig = EventSignature::parse("Swap(uint256,uint256)").unwrap(); + let mut data = Vec::new(); + data.extend_from_slice(&[0u8; 31]); + data.push(7); + data.extend_from_slice(&[0u8; 31]); + data.push(13); + let log = synth_log(vec![sig.topic0], data); + let decoded = reg.decode(&log).unwrap().expect("matched"); + assert_eq!(decoded.params.len(), 2); + assert!(decoded.value("amount").is_none()); + let AbiValue::Word(w0) = &decoded.params[0].1 else { + panic!() + }; + assert_eq!(w0[31], 7); + } + + #[test] + fn decode_bool_word() { + let mut reg = EventRegistry::new(); + reg.register_signature("Set(bool flag)").unwrap(); + let sig = EventSignature::parse("Set(bool flag)").unwrap(); + let mut data = vec![0u8; 32]; + data[31] = 1; + let log = synth_log(vec![sig.topic0], data); + let d = reg.decode(&log).unwrap().expect("match"); + match d.value("flag").unwrap() { + AbiValue::Bool(true) => {} + other => panic!("expected Bool(true), got {other:?}"), + } + + let log_false = synth_log(vec![sig.topic0], vec![0u8; 32]); + let d2 = reg.decode(&log_false).unwrap().expect("match"); + match d2.value("flag").unwrap() { + AbiValue::Bool(false) => {} + other => panic!("expected Bool(false), got {other:?}"), + } + } + + #[test] + fn topic0_known_signature_matches_external_reference() { + // Cross-check against the `events` module's pre-baked + // TRC20_TRANSFER_TOPIC0 — both should derive identically from + // the canonical signature. + let sig = EventSignature::parse("Transfer(address,address,uint256)").unwrap(); + assert_eq!(sig.topic0, crate::events::TRC20_TRANSFER_TOPIC0); + } + + #[test] + fn registry_signature_lookup_returns_handle() { + let mut reg = EventRegistry::new(); + let sig = EventSignature::parse("Pause()").unwrap(); + let topic = sig.topic0; + reg.register(sig); + assert!(reg.signature(&topic).is_some()); + assert!(reg.signature(&[0xff; 32]).is_none()); + } + + #[test] + fn registry_register_overwrites_collision() { + // Two signatures with the same canonical body collide on + // topic0; second wins. + let mut reg = EventRegistry::new(); + reg.register_signature("Foo(uint256)").unwrap(); + // Canonical "Foo(uint256)" — same topic0, but registered with + // a parameter name. + reg.register_signature("Foo(uint256value)").unwrap_err(); // bad parse + reg.register_signature("Foo(uint256 value)").unwrap(); + // Still one entry; second registration overwrote the first. + assert_eq!(reg.len(), 1); + let sig = EventSignature::parse("Foo(uint256)").unwrap(); + assert_eq!(reg.signature(&sig.topic0).unwrap().params[0].name, "value"); + } +} diff --git a/crates/lightcycle-codec/src/lib.rs b/crates/lightcycle-codec/src/lib.rs index 8658007..37ac8da 100644 --- a/crates/lightcycle-codec/src/lib.rs +++ b/crates/lightcycle-codec/src/lib.rs @@ -27,15 +27,22 @@ //! //! - [`decode_trc20_transfer`] / [`decode_trc20_approval`] — the two //! universal TRC-20 events recognized by topic hash. Standalone, -//! ABI-registry-free helpers; full-ABI decoding is future work. +//! ABI-registry-free helpers. +//! +//! - [`abi`] — minimal ABI registry + event decoder for arbitrary +//! contract events. Operators register signatures like +//! `"Swap(address indexed sender, uint256 amount0In, ...)"`; the +//! registry computes the `topic[0]` hash and decodes matching logs +//! into structured [`abi::DecodedEvent`] values. Static-type subset +//! (address, bool, uintN, intN, bytesN) covers ~80% of real-world +//! contracts; dynamic types (string, bytes, T[]) deferred. //! //! Deferred (separate entry points, separate crates' job to provide //! inputs): //! -//! - **Event log decoding (arbitrary contracts).** Needs an ABI -//! registry. Lives behind a future `decode_event(log, abi)` entry -//! point. v0.1 ships only the universal TRC-20 helpers above plus -//! raw [`Log`] passthrough. +//! - **Dynamic ABI types** (`string`, `bytes`, `T[]`, tuples, structs). +//! The registry [`abi::EventSignature::parse`] returns a typed error +//! for these; expansion lands when a real consumer needs them. //! - **SM2 sigverify.** Investigated 2026-05-09 (java-tron #6588, //! PR #6627): the SM2 codepath exists in `SignUtils` but is dormant //! on mainnet — `isECKeyCryptoEngine` is hard-true and no mainnet @@ -44,6 +51,7 @@ //! `WitnessAddressMismatch` here means a real bug or an attacker, //! not a benign engine mismatch. +pub mod abi; mod block; mod error; mod events; diff --git a/crates/lightcycle-firehose/Cargo.toml b/crates/lightcycle-firehose/Cargo.toml index dbec9ec..c69f3ef 100644 --- a/crates/lightcycle-firehose/Cargo.toml +++ b/crates/lightcycle-firehose/Cargo.toml @@ -28,6 +28,7 @@ metrics.workspace = true [dev-dependencies] tokio = { workspace = true, features = ["full", "test-util"] } +tempfile.workspace = true [lints] workspace = true diff --git a/crates/lightcycle-firehose/src/archiver.rs b/crates/lightcycle-firehose/src/archiver.rs new file mode 100644 index 0000000..8e3d31c --- /dev/null +++ b/crates/lightcycle-firehose/src/archiver.rs @@ -0,0 +1,235 @@ +//! Archiver task: subscribes to the relayer's broadcast and persists +//! every block that crosses the chain's solidified-head threshold +//! (i.e. every `Output::Irreversible`) into a [`BlockArchive`]. +//! +//! ## Why a separate task +//! +//! The relayer broadcasts `Output`s through a `tokio::broadcast` +//! channel. Multiple subscribers (the firehose Stream service, this +//! archiver, future Kafka sinks, etc.) attach via `subscribe()` and +//! consume independently. Decoupling the archive write from the +//! relayer's hot path means: +//! +//! - The relayer's tick loop never blocks on disk I/O. If the archive +//! is slow (e.g. SSD latency spike), the broadcast buffers up to +//! capacity; on overrun the archiver gets `Lagged` and resyncs from +//! the next `Irreversible`. The relayer never sees back-pressure. +//! - The archive's encoding contract (`pb::Block` bytes via +//! [`crate::encode_block`]) is a firehose concern, not a relayer +//! concern. Co-locating the encoder with the archiver is the right +//! layering. +//! +//! ## What gets written +//! +//! Only `Output::Irreversible(StreamableBlock)`. By construction: +//! +//! - The block is past the chain's solidified-head threshold per the +//! engine's view, so it will not be reorganised. +//! - `step` is implicitly `New` (Irreversible is a tier transition, +//! not a separate step in the wire vocabulary). +//! - `finality.tier == Finalized` and `finality.solidified_head` is +//! populated. +//! +//! The archive value is `pb::Block.encode_to_vec()` — the same shape +//! `Stream.Blocks` and `Fetch.Block` re-emit on the wire. +//! +//! ## Crash semantics +//! +//! redb commits on every put. A crash mid-write rolls back to the +//! last committed state — at most one block in flight is lost, and +//! the archiver re-derives it from the next `Irreversible` after +//! restart (the relayer re-emits `Irreversible` for any block that +//! crossed the threshold during the gap, by walking its canonical +//! buffer against the chain's solidified head on cold start). +//! +//! Lag handling: `Lagged(n)` is treated as a soft failure; we log +//! and increment a counter but continue. The lost frames will be +//! re-sent next time the chain advances and the engine re-emits +//! Irreversible for any window the archive missed during the gap. + +use lightcycle_relayer::Output; +use lightcycle_store::BlockArchive; +use prost::Message; +use tokio::sync::broadcast; +use tracing::{debug, warn}; + +use crate::encode::encode_block; + +/// Run the archiver loop. Returns when the broadcast sender is +/// dropped (i.e. the relayer shuts down). The CLI typically spawns +/// this with `tokio::spawn` alongside the firehose `serve` future. +pub async fn run_archiver(mut rx: broadcast::Receiver, archive: BlockArchive) { + debug!("archiver task started"); + metrics::counter!("lightcycle_firehose_archiver_starts_total").increment(1); + + loop { + match rx.recv().await { + Ok(Output::Irreversible(sb)) => { + let height = sb.block.height; + let block_id = sb.block.block_id; + // Re-encode pb::Block with the engine's finality + // claim. The archive then carries a self-describing + // block with a valid finality envelope, so a backfill + // emission from the archive doesn't depend on + // re-fetching the chain state at read time. + let bytes = encode_block(&sb.block, sb.finality).encode_to_vec(); + match archive.put(height, block_id, &bytes) { + Ok(()) => { + metrics::counter!("lightcycle_firehose_archiver_writes_total").increment(1); + tracing::trace!(height, "archived block"); + } + Err(e) => { + warn!(error = ?e, height, "archive write failed; block not persisted"); + metrics::counter!("lightcycle_firehose_archiver_errors_total").increment(1); + } + } + } + Ok(_) => { + // Other Output variants (New / Undo / ForkObserved / + // ForkResolved) are not archived. By design — only + // past-finality blocks land in the archive. + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + warn!(skipped, "archiver lagged the broadcast hub"); + metrics::counter!("lightcycle_firehose_archiver_lagged_total").increment(skipped); + } + Err(broadcast::error::RecvError::Closed) => { + debug!("archiver: broadcast channel closed; exiting"); + break; + } + } + } +} + +/// Describe the archiver metrics so they show up in Prometheus output +/// from process startup. +pub fn describe_archiver_metrics() { + metrics::describe_counter!( + "lightcycle_firehose_archiver_starts_total", + "Block archiver task starts (process restarts)." + ); + metrics::describe_counter!( + "lightcycle_firehose_archiver_writes_total", + "Blocks written to the archive (one per Output::Irreversible)." + ); + metrics::describe_counter!( + "lightcycle_firehose_archiver_errors_total", + "Archive write errors. Sustained nonzero rate means the disk \ + is saturating or the redb file is misconfigured." + ); + metrics::describe_counter!( + "lightcycle_firehose_archiver_lagged_total", + "Total broadcast frames the archiver missed due to Lagged. \ + The relayer re-emits Irreversible on cold start so the \ + archive recovers eventually, but a high rate indicates the \ + broadcast buffer is undersized." + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use lightcycle_codec::{DecodedBlock, DecodedHeader}; + use lightcycle_relayer::{BufferedBlock, Cursor, Output, StreamableBlock}; + use lightcycle_types::{Address, BlockFinality, BlockId, FinalityTier, Step}; + use tempfile::tempdir; + + fn synth_irrev(height: u64) -> Output { + let block = BufferedBlock { + height, + block_id: BlockId([height as u8; 32]), + parent_id: BlockId([(height.wrapping_sub(1)) as u8; 32]), + fork_id: 0, + decoded: DecodedBlock { + header: DecodedHeader { + height, + block_id: BlockId([height as u8; 32]), + parent_id: BlockId([(height.wrapping_sub(1)) as u8; 32]), + raw_data_hash: [0u8; 32], + tx_trie_root: [0u8; 32], + timestamp_ms: 1_777_854_558_000, + witness: Address([0x41; 21]), + witness_signature: vec![], + version: 34, + }, + transactions: vec![], + }, + tx_infos: vec![], + }; + Output::Irreversible(StreamableBlock { + step: Step::Irreversible, + cursor: Cursor::new(height, BlockId([height as u8; 32])), + block, + finality: BlockFinality { + tier: FinalityTier::Finalized, + solidified_head: Some(height + 30), + }, + }) + } + + #[tokio::test] + async fn archiver_writes_irreversible_blocks() { + let dir = tempdir().unwrap(); + let archive = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + let (tx, rx) = broadcast::channel(16); + let archive_handle = archive.clone(); + let task = tokio::spawn(run_archiver(rx, archive_handle)); + + for h in 100..=104 { + tx.send(synth_irrev(h)).expect("send"); + } + // Drop sender to terminate the loop. + drop(tx); + task.await.expect("archiver join"); + + assert_eq!(archive.len().unwrap(), 5); + assert!(archive.get(102).unwrap().is_some()); + } + + #[tokio::test] + async fn archiver_ignores_non_irreversible_outputs() { + // Synthesize a New step (not Irreversible) and confirm the + // archive remains empty. + let dir = tempdir().unwrap(); + let archive = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + let (tx, rx) = broadcast::channel(16); + let archive_handle = archive.clone(); + let task = tokio::spawn(run_archiver(rx, archive_handle)); + + let block = BufferedBlock { + height: 100, + block_id: BlockId([100u8; 32]), + parent_id: BlockId([99u8; 32]), + fork_id: 0, + decoded: DecodedBlock { + header: DecodedHeader { + height: 100, + block_id: BlockId([100u8; 32]), + parent_id: BlockId([99u8; 32]), + raw_data_hash: [0u8; 32], + tx_trie_root: [0u8; 32], + timestamp_ms: 0, + witness: Address([0x41; 21]), + witness_signature: vec![], + version: 34, + }, + transactions: vec![], + }, + tx_infos: vec![], + }; + let new_evt = Output::New(StreamableBlock { + step: Step::New, + cursor: Cursor::new(100, BlockId([100u8; 32])), + block, + finality: BlockFinality { + tier: FinalityTier::Seen, + solidified_head: None, + }, + }); + tx.send(new_evt).expect("send"); + drop(tx); + task.await.expect("archiver join"); + + assert!(archive.is_empty().unwrap()); + } +} diff --git a/crates/lightcycle-firehose/src/lib.rs b/crates/lightcycle-firehose/src/lib.rs index 301454a..016b977 100644 --- a/crates/lightcycle-firehose/src/lib.rs +++ b/crates/lightcycle-firehose/src/lib.rs @@ -32,11 +32,13 @@ #![allow(dead_code)] +mod archiver; mod encode; mod hub; mod oracle; mod server; +pub use archiver::{describe_archiver_metrics, run_archiver}; pub use encode::{encode_block, BLOCK_TYPE_URL}; pub use hub::Hub; pub use oracle::{describe_oracle_metrics, BlockOracle, CachingBlockOracle, SharedBlockOracle}; @@ -63,26 +65,39 @@ use lightcycle_proto::firehose::v2::{ stream_server::StreamServer, }; use lightcycle_relayer::BufferedBlock; -use lightcycle_store::SharedBlockCache; +use lightcycle_store::{BlockArchive, SharedBlockCache}; use lightcycle_types::BlockHeight; use tokio::sync::watch; use tonic::transport::Server; use tracing::info; -/// Optional backfill wiring for [`serve`]. When both fields are -/// `Some`, `Stream.Blocks` honors `start_block_num` and `cursor` -/// requests by walking the cache before joining the live broadcast. -/// When either is `None`, requests with start hints get rejected -/// with `FailedPrecondition`. +/// Optional backfill wiring for [`serve`]. When `cache` and +/// `solidified_head` are present, `Stream.Blocks` honors +/// `start_block_num` and `cursor` requests by walking the cache before +/// joining the live broadcast. When `archive` is also present, the +/// backfill walker first fills the gap from the archive (height range +/// `[requested_start, cache.min_height - 1]`) before chaining into the +/// cache walk — extending resume support past the in-memory window. +/// +/// All three are independent: a cache without an archive serves only +/// the in-memory window; an archive without a cache serves only past +/// blocks (rare, but valid for cold-archive consumers). #[derive(Clone)] pub struct StreamBackfill { pub cache: SharedBlockCache, pub solidified_head: watch::Receiver>, + /// Persistent block archive for resume past the in-memory cache + /// window. `None` keeps lightcycle on the v0.x in-memory-only + /// behavior; `Some(_)` extends backfill to the archive's retention + /// floor. + pub archive: Option, } impl std::fmt::Debug for StreamBackfill { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("StreamBackfill").finish_non_exhaustive() + f.debug_struct("StreamBackfill") + .field("archive", &self.archive.is_some()) + .finish_non_exhaustive() } } @@ -102,11 +117,25 @@ pub async fn serve( backfill: Option, shutdown: impl std::future::Future + Send + 'static, ) -> Result<()> { + // The archive (if present) is shared between Stream.Blocks + // backfill and Fetch.Block — both fall through to it on cache + // miss. Pull it out before consuming `backfill` in the Stream + // service builder. + let archive_for_fetch = backfill.as_ref().and_then(|b| b.archive.clone()); let stream_svc = match backfill { - Some(b) => StreamService::new(hub).with_block_cache(b.cache, b.solidified_head), + Some(b) => { + let mut svc = StreamService::new(hub).with_block_cache(b.cache, b.solidified_head); + if let Some(arc) = b.archive { + svc = svc.with_archive(arc); + } + svc + } None => StreamService::new(hub), }; - let fetch_svc = FetchService::new(oracle); + let mut fetch_svc = FetchService::new(oracle); + if let Some(arc) = archive_for_fetch { + fetch_svc = fetch_svc.with_archive(arc); + } let info_svc = EndpointInfoService::new(chain_name); info!(%listen, "firehose gRPC server starting"); diff --git a/crates/lightcycle-firehose/src/server.rs b/crates/lightcycle-firehose/src/server.rs index 7107f2e..2d649c4 100644 --- a/crates/lightcycle-firehose/src/server.rs +++ b/crates/lightcycle-firehose/src/server.rs @@ -42,7 +42,7 @@ use lightcycle_proto::firehose::v2::{ Request, Response, SingleBlockRequest, SingleBlockResponse, }; use lightcycle_relayer::{BufferedBlock, Cursor, Output, StreamableBlock}; -use lightcycle_store::SharedBlockCache; +use lightcycle_store::{BlockArchive, SharedBlockCache}; use lightcycle_types::{BlockFinality, BlockHeight, Step}; use prost::Message; use prost_types::{Any, Timestamp}; @@ -75,6 +75,12 @@ pub struct StreamService { /// `BlockFinality` for cached (cursor-resumed) blocks at emit /// time. Same channel the relayer broadcasts to. solidified_head: Option>>, + /// Persistent block archive. Backfill walker first walks the + /// archive (heights below `cache.min_height`) before chaining into + /// the cache walk. `None` keeps lightcycle on the in-memory-only + /// behavior; `Some(_)` extends backfill to the archive's retention + /// floor. + archive: Option, } impl StreamService { @@ -83,6 +89,7 @@ impl StreamService { hub, block_cache: None, solidified_head: None, + archive: None, } } @@ -98,6 +105,15 @@ impl StreamService { self.solidified_head = Some(solidified_head); self } + + /// Attach a persistent block archive. Stream.Blocks backfill will + /// walk the archive when the consumer's resume height is below the + /// in-memory cache window, extending resume capability past the + /// in-memory horizon. + pub fn with_archive(mut self, archive: BlockArchive) -> Self { + self.archive = Some(archive); + self + } } #[tonic::async_trait] @@ -172,56 +188,123 @@ impl StreamSvc for StreamService { and --block-cache-capacity > 0", )); }; - // Snapshot the relevant cache range under read lock, - // then drop the lock before doing anything else. let head = self.solidified_head.as_ref().and_then(|w| *w.borrow()); - let snapshot = { + + // Snapshot the cache range. We don't refuse-on-empty here + // anymore — when an archive is attached, an empty cache is + // recoverable as long as the archive covers the request. + let cache_window: Option<(BlockHeight, BlockHeight)> = { let guard = cache.read().await; - let (Some(min_h), Some(max_h)) = (guard.min_height(), guard.max_height()) else { - // Cache is empty — nothing to backfill from. The - // honest response is "ask later," not silently - // attaching to live (which would create a gap). - return Err(Status::failed_precondition( - "block cache is empty; cannot serve backfill yet", - )); - }; - if h < min_h { - return Err(Status::failed_precondition(format!( - "start height {h} is below cache window [{min_h}, {max_h}]; \ - backfill beyond the cache window is not yet supported" - ))); + match (guard.min_height(), guard.max_height()) { + (Some(mn), Some(mx)) => Some((mn, mx)), + _ => None, } - if h > max_h.saturating_add(1) { - // The +1 is the boundary case "start at the next - // block after the current tip" — that's a valid - // request that simply has zero backfill rows; we - // should not error on it. - return Err(Status::failed_precondition(format!( - "start height {h} is ahead of cache tip {max_h}; \ - remove start_block_num or wait for the chain to advance" - ))); + }; + + // Stage 1: walk the archive for any heights below the + // cache's lower bound (or the entire request, if the cache + // is empty). The archive holds pre-encoded `pb::Block` + // bytes; we wrap them in a Response without decoding the + // full block — only enough to read metadata fields. + let archive_rows: Vec = match (&self.archive, cache_window) { + (Some(arc), Some((min_h, _))) if h < min_h => { + archive_walk(arc, h, min_h.saturating_sub(1)).map_err(|e| { + Status::failed_precondition(format!( + "archive walk failed for [{h}, {}]: {e}", + min_h.saturating_sub(1) + )) + })? + } + (Some(arc), None) => { + // Cache is empty — try to serve entirely from the + // archive. Bound the walk by the archive's max so + // we don't ask for an impossible range. + let max = arc + .max_height() + .map_err(|e| Status::internal(format!("archive max_height: {e}")))?; + match max { + Some(top) if top >= h => archive_walk(arc, h, top).map_err(|e| { + Status::failed_precondition(format!( + "archive walk failed for [{h}, {top}]: {e}" + )) + })?, + _ => Vec::new(), + } } - // Walk min(h, max_h+1)..=max_h. Per-height lookup is - // O(log n) on the BTreeMap — fast in practice. - let upper = max_h; - let mut rows = Vec::new(); - for height in h..=upper { - if let Some((_id, buffered)) = guard.get_by_height(height) { - rows.push(buffered); - } else { - // Height is in the [min, max] interval but the - // entry was evicted between min/max snapshot - // and the per-height lookup. Treat as cache - // miss and refuse — this should be rare, and a - // partial walk would create gaps the consumer - // can't detect. + _ => Vec::new(), + }; + + // After the archive walk, decide where the cache walk + // resumes from. If the archive walked nothing, cache walk + // starts at h. If the archive walked some heights, cache + // walk starts at archive's last_emitted + 1 (which equals + // cache.min_height when both are present). + let cache_walk_from: BlockHeight = archive_rows + .last() + .and_then(|r| r.metadata.as_ref().map(|m| m.num.saturating_add(1))) + .unwrap_or(h); + + // Stage 2: walk the cache from `cache_walk_from` to its + // tip. Reject up front if the requested range falls + // entirely outside what cache + archive can cover. + let cache_rows: Vec = match cache_window { + Some((min_h, max_h)) => { + if cache_walk_from < min_h { + // Should not happen given the staging above, + // but guard against off-by-one bugs (e.g. an + // empty archive that didn't advance the + // cursor). return Err(Status::failed_precondition(format!( - "block at height {height} evicted from cache mid-walk; \ - retry with the same start_block_num" + "start height {h} is below archive+cache coverage \ + [archive=?, cache_min={min_h}, cache_max={max_h}]" ))); } + if cache_walk_from > max_h.saturating_add(1) { + return Err(Status::failed_precondition(format!( + "start height {h} is ahead of cache tip {max_h}; \ + remove start_block_num or wait for the chain to advance" + ))); + } + let guard = cache.read().await; + let mut rows = Vec::new(); + for height in cache_walk_from..=max_h { + if let Some((_id, buffered)) = guard.get_by_height(height) { + rows.push(buffered); + } else { + // Height is in the [min, max] interval but + // the entry was evicted between min/max + // snapshot and the per-height lookup. + // Treat as cache miss and refuse — this + // should be rare, and a partial walk would + // create gaps the consumer can't detect. + return Err(Status::failed_precondition(format!( + "block at height {height} evicted from cache mid-walk; \ + retry with the same start_block_num" + ))); + } + } + rows + } + None => { + // Cache is empty. archive_rows may have served the + // entire request, or partially. If the archive + // didn't reach the live tip, the consumer would + // see a gap on the live side; reject unless the + // archive is empty and we're effectively a + // live-tail request from h. + if archive_rows.is_empty() && self.archive.is_some() { + return Err(Status::failed_precondition( + "block cache is empty and archive does not cover this request; \ + reconnect after the relayer warms up", + )); + } + if archive_rows.is_empty() { + return Err(Status::failed_precondition( + "block cache is empty; cannot serve backfill yet", + )); + } + Vec::new() } - rows }; metrics::counter!( @@ -230,23 +313,25 @@ impl StreamSvc for StreamService { ) .increment(1); metrics::histogram!("lightcycle_firehose_backfill_blocks") - .record(snapshot.len() as f64); - - snapshot - .into_iter() - .map(|buffered| { - let height = buffered.height; - let finality = BlockFinality::for_block(height, head, false); - let cursor = Cursor::new(buffered.height, buffered.block_id); - let sb = StreamableBlock { - block: buffered, - step: Step::New, - finality, - cursor, - }; - streamable_to_response(sb, ForkStep::StepNew) - }) - .collect() + .record((archive_rows.len() + cache_rows.len()) as f64); + if !archive_rows.is_empty() { + metrics::counter!("lightcycle_firehose_archive_backfill_blocks_total") + .increment(archive_rows.len() as u64); + } + + let cache_responses = cache_rows.into_iter().map(|buffered| { + let height = buffered.height; + let finality = BlockFinality::for_block(height, head, false); + let cursor = Cursor::new(buffered.height, buffered.block_id); + let sb = StreamableBlock { + block: buffered, + step: Step::New, + finality, + cursor, + }; + streamable_to_response(sb, ForkStep::StepNew) + }); + archive_rows.into_iter().chain(cache_responses).collect() } else { Vec::new() }; @@ -291,6 +376,73 @@ impl StreamSvc for StreamService { } } +/// Walk the archive over `[lo, hi]` (inclusive) and project each row +/// into a `Response`. Each archived value is `pb::Block`-encoded +/// (written by the archiver task on `Output::Irreversible`); we +/// re-emit those bytes directly as `Response.block.value` and decode +/// only enough to populate `Response.metadata`. Cursor + step are +/// reconstructed from the archive's known properties (archived blocks +/// are by construction finalized with `fork_id = 0`). +fn archive_walk( + archive: &BlockArchive, + lo: BlockHeight, + hi: BlockHeight, +) -> Result, lightcycle_store::ArchiveError> { + use lightcycle_proto::sf::tron::type_v1 as pb; + use lightcycle_types::BlockId; + + // Cap per-call walk size. 5000 archived blocks ≈ 4h at 3-second + // slot times — large enough for any sane resume, small enough that + // we don't OOM on a misbehaving consumer asking for everything. + const ARCHIVE_WALK_LIMIT: usize = 5000; + + let rows = archive.range(lo, hi, ARCHIVE_WALK_LIMIT)?; + let mut out = Vec::with_capacity(rows.len()); + for (height, block_id, payload) in rows { + // Decode just enough to populate metadata. pb::Block parses + // cheaply; we use it as the source of truth for parent_id, + // time, and the embedded finality.solidified_head_number. + let pb_block = match pb::Block::decode(payload.as_slice()) { + Ok(b) => b, + Err(e) => { + tracing::warn!(height, error = ?e, "archive payload failed to decode; skipping"); + metrics::counter!("lightcycle_firehose_archive_decode_errors_total").increment(1); + continue; + } + }; + let parent_id_bytes: [u8; 32] = pb_block + .parent_id + .as_slice() + .try_into() + .unwrap_or([0u8; 32]); + let lib_num = pb_block + .finality + .as_ref() + .map(|f| f.solidified_head_number) + .unwrap_or(0); + let metadata = BlockMetadata { + num: height, + id: hex::encode(block_id.0), + parent_num: height.saturating_sub(1), + parent_id: hex::encode(parent_id_bytes), + lib_num, + time: pb_block.time, + }; + let block_any = Any { + type_url: BLOCK_TYPE_URL.into(), + value: payload, + }; + let cursor = Cursor::new(height, BlockId(block_id.0)); + out.push(Response { + block: Some(block_any), + step: ForkStep::StepNew as i32, + cursor: hex::encode(cursor.to_bytes()), + metadata: Some(metadata), + }); + } + Ok(out) +} + /// Encode a single StreamableBlock to a Firehose Response. Used by /// the backfill walk; live-stream emissions go through /// [`output_to_response`]. @@ -390,17 +542,35 @@ fn timestamp_from_ms(ms: i64) -> Timestamp { #[derive(Clone)] pub struct FetchService { oracle: SharedBlockOracle, + /// Persistent block archive. On a cache miss inside the oracle's + /// caching decorator, the archive is checked before the upstream + /// RPC is invoked. None falls back to oracle-only behavior (the + /// v0.x default). + archive: Option, } impl std::fmt::Debug for FetchService { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FetchService").finish() + f.debug_struct("FetchService") + .field("archive", &self.archive.is_some()) + .finish() } } impl FetchService { pub fn new(oracle: SharedBlockOracle) -> Self { - Self { oracle } + Self { + oracle, + archive: None, + } + } + + /// Attach a persistent block archive. Fetch.Block requests for + /// heights past the in-memory cache are served from the archive + /// instead of round-tripping to the upstream RPC. + pub fn with_archive(mut self, archive: BlockArchive) -> Self { + self.archive = Some(archive); + self } } @@ -441,6 +611,37 @@ impl FetchSvc for FetchService { metrics::counter!("lightcycle_firehose_fetch_total", "result" => "in_flight").increment(1); let started = std::time::Instant::now(); + // Archive fast-path: serves any block past the chain's + // finality threshold without a round-trip to the upstream RPC. + // Cache (held by the oracle's caching decorator) is still + // checked next on archive miss, so latest-head fetches retain + // sub-millisecond response time. + if let Some(archive) = &self.archive { + match archive.get(height) { + Ok(Some((block_id, payload))) => { + let elapsed = started.elapsed().as_secs_f64(); + metrics::histogram!("lightcycle_firehose_fetch_duration_seconds") + .record(elapsed); + metrics::counter!( + "lightcycle_firehose_fetch_total", + "result" => "archive_hit" + ) + .increment(1); + return Ok(tonic::Response::new(archive_hit_response( + height, block_id, payload, + ))); + } + Ok(None) => { + // Fall through to oracle. + } + Err(e) => { + warn!(error = ?e, height, "archive read error; falling through to oracle"); + metrics::counter!("lightcycle_firehose_fetch_total", "result" => "archive_error") + .increment(1); + } + } + } + let outcome = self.oracle.fetch_block_by_number(height).await; let elapsed = started.elapsed().as_secs_f64(); @@ -487,6 +688,44 @@ impl FetchSvc for FetchService { } } +/// Build a `SingleBlockResponse` from an archived `pb::Block` payload. +/// Same projection logic as `archive_walk` but for the Fetch.Block +/// surface — the metadata is decoded out of the payload, the bytes are +/// passed through verbatim as the `Response.block.value`. +fn archive_hit_response( + height: BlockHeight, + block_id: lightcycle_types::BlockId, + payload: Vec, +) -> SingleBlockResponse { + use lightcycle_proto::sf::tron::type_v1 as pb; + + let pb_block = pb::Block::decode(payload.as_slice()).ok(); + let parent_id_bytes: [u8; 32] = pb_block + .as_ref() + .and_then(|b| b.parent_id.as_slice().try_into().ok()) + .unwrap_or([0u8; 32]); + let lib_num = pb_block + .as_ref() + .and_then(|b| b.finality.as_ref().map(|f| f.solidified_head_number)) + .unwrap_or(0); + let time = pb_block.as_ref().and_then(|b| b.time); + let metadata = BlockMetadata { + num: height, + id: hex::encode(block_id.0), + parent_num: height.saturating_sub(1), + parent_id: hex::encode(parent_id_bytes), + lib_num, + time, + }; + SingleBlockResponse { + block: Some(Any { + type_url: BLOCK_TYPE_URL.into(), + value: payload, + }), + metadata: Some(metadata), + } +} + /// Minimal endpoint-info service. Reports chain identity so /// orchestrators can sanity-check the connection. #[derive(Clone, Debug)] diff --git a/crates/lightcycle-firehose/tests/grpc_round_trip.rs b/crates/lightcycle-firehose/tests/grpc_round_trip.rs index c7c0b4e..586f646 100644 --- a/crates/lightcycle-firehose/tests/grpc_round_trip.rs +++ b/crates/lightcycle-firehose/tests/grpc_round_trip.rs @@ -460,6 +460,7 @@ async fn backfill_walks_cache_and_then_streams_live() { let backfill = lightcycle_firehose::StreamBackfill { cache: cache.clone(), solidified_head: head_rx, + archive: None, }; let addr = pick_addr().await; @@ -572,6 +573,7 @@ async fn backfill_dedups_when_cache_overlaps_live() { let backfill = lightcycle_firehose::StreamBackfill { cache: cache.clone(), solidified_head: head_rx, + archive: None, }; let addr = pick_addr().await; @@ -648,3 +650,336 @@ async fn backfill_dedups_when_cache_overlaps_live() { let _ = shutdown_tx.send(()); let _ = tokio::time::timeout(Duration::from_secs(2), server).await; } + +#[tokio::test] +async fn backfill_walks_archive_then_cache_then_live() { + // Archive holds 50..60 (older finalized blocks). + // Cache holds 60..63 (recent in-memory window). + // Subscribe with start_block_num=50. + // Expect: 50..60 from archive, 60..63 from cache, then 63 from + // live. (Archive and cache abut at 60; cache wins for 60 since + // archive_walk emits up to cache.min_height-1 = 59.) + let dir = tempfile::tempdir().unwrap(); + let archive = lightcycle_store::BlockArchive::open(dir.path().join("archive.redb")).unwrap(); + for h in 50..60u64 { + let pb = tron_v1::Block { + number: h, + id: vec![h as u8; 32], + parent_id: vec![(h - 1) as u8; 32], + time: Some(prost_types::Timestamp { + seconds: 1_777_854_558 + h as i64, + nanos: 0, + }), + header: None, + transactions: vec![], + finality: Some(tron_v1::Finality { + tier: tron_v1::FinalityTier::Finalized as i32, + solidified_head_number: 100, + }), + }; + archive + .put(h, BlockId([h as u8; 32]), &pb.encode_to_vec()) + .unwrap(); + } + + let cache = lightcycle_store::new_shared::(16); + { + let mut g = cache.write().await; + for h in 60..63u64 { + let buffered = BufferedBlock { + height: h, + block_id: BlockId([h as u8; 32]), + parent_id: BlockId([(h - 1) as u8; 32]), + fork_id: 0, + decoded: DecodedBlock { + header: DecodedHeader { + height: h, + block_id: BlockId([h as u8; 32]), + parent_id: BlockId([(h - 1) as u8; 32]), + raw_data_hash: [0u8; 32], + tx_trie_root: [0u8; 32], + timestamp_ms: 1_777_854_558_000 + h as i64, + witness: Address([0x41; 21]), + witness_signature: vec![], + version: 34, + }, + transactions: vec![], + }, + tx_infos: vec![], + }; + g.insert(h, buffered.block_id, buffered); + } + } + + let hub = Hub::new(64); + let (_head_tx, head_rx) = + tokio::sync::watch::channel::>(None); + let backfill = lightcycle_firehose::StreamBackfill { + cache: cache.clone(), + solidified_head: head_rx, + archive: Some(archive.clone()), + }; + + let addr = pick_addr().await; + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server = tokio::spawn({ + let hub = hub.clone(); + async move { + let _ = serve( + addr, + hub, + empty_oracle(), + "tron-test", + Some(backfill), + async move { + let _ = shutdown_rx.await; + }, + ) + .await; + } + }); + + let mut client = None; + for _ in 0..40 { + if let Ok(c) = StreamClient::connect(format!("http://{addr}")).await { + client = Some(c); + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let mut client = client.expect("server didn't accept connections"); + + let mut stream = client + .blocks(Request { + start_block_num: 50, + cursor: String::new(), + stop_block_num: 0, + final_blocks_only: false, + transforms: vec![], + }) + .await + .expect("stream request accepted") + .into_inner(); + + // 13 backfill frames: 50..60 from archive, 60..63 from cache. + let mut got = Vec::new(); + for _ in 50..63u64 { + let r = tokio::time::timeout(Duration::from_secs(2), stream.next()) + .await + .expect("backfill frame timed out") + .expect("stream ended early") + .expect("backfill frame is Err"); + got.push(r.metadata.unwrap().num); + } + assert_eq!(got, (50..63u64).collect::>()); + + // Live tail: push 63. + hub.sender() + .send(synth_output(Step::New, 63)) + .expect("broadcast send"); + let r = tokio::time::timeout(Duration::from_secs(2), stream.next()) + .await + .expect("live frame timed out") + .expect("stream ended") + .expect("live frame is Err"); + assert_eq!(r.metadata.unwrap().num, 63); + + let _ = shutdown_tx.send(()); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; +} + +#[tokio::test] +async fn fetch_block_archive_hit_short_circuits_oracle() { + // Pre-populate archive with height 42. FetchService is wired with + // archive + an oracle that would return None. Expect the Fetch + // call to succeed with the archived bytes (not a NotFound). + let dir = tempfile::tempdir().unwrap(); + let archive = lightcycle_store::BlockArchive::open(dir.path().join("a.redb")).unwrap(); + let pb = tron_v1::Block { + number: 42, + id: vec![42u8; 32], + parent_id: vec![41u8; 32], + time: Some(prost_types::Timestamp { + seconds: 1_777_854_600, + nanos: 0, + }), + header: None, + transactions: vec![], + finality: Some(tron_v1::Finality { + tier: tron_v1::FinalityTier::Finalized as i32, + solidified_head_number: 100, + }), + }; + archive + .put(42, BlockId([42u8; 32]), &pb.encode_to_vec()) + .unwrap(); + + // Stand the firehose up directly via the lib's `serve` so the + // archive flows through to FetchService. + let cache = lightcycle_store::new_shared::(4); + let (_head_tx, head_rx) = + tokio::sync::watch::channel::>(None); + let backfill = lightcycle_firehose::StreamBackfill { + cache, + solidified_head: head_rx, + archive: Some(archive), + }; + let hub = Hub::new(16); + let addr = pick_addr().await; + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server = tokio::spawn(async move { + let _ = serve( + addr, + hub, + empty_oracle(), + "tron-test", + Some(backfill), + async move { + let _ = shutdown_rx.await; + }, + ) + .await; + }); + + let mut client = None; + for _ in 0..40 { + if let Ok(c) = FetchClient::connect(format!("http://{addr}")).await { + client = Some(c); + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let mut client = client.expect("server didn't accept Fetch"); + + let resp = client + .block(SingleBlockRequest { + transforms: vec![], + reference: Some(Reference::BlockNumber(BlockNumber { num: 42 })), + }) + .await + .expect("fetch ok despite oracle returning None") + .into_inner(); + let md = resp.metadata.expect("metadata present"); + assert_eq!(md.num, 42); + let any = resp.block.expect("block payload present"); + assert_eq!(any.type_url, BLOCK_TYPE_URL); + let decoded = tron_v1::Block::decode(any.value.as_slice()).expect("decode pb::Block"); + assert_eq!(decoded.number, 42); + assert_eq!( + decoded.finality.unwrap().tier, + tron_v1::FinalityTier::Finalized as i32 + ); + + let _ = shutdown_tx.send(()); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; +} + +#[tokio::test] +async fn backfill_below_archive_floor_returns_failed_precondition() { + // Archive starts at height 100; cache starts at height 110. + // Subscribe with start_block_num=50 — below archive floor. + let dir = tempfile::tempdir().unwrap(); + let archive = lightcycle_store::BlockArchive::open(dir.path().join("a.redb")).unwrap(); + for h in 100..105u64 { + let pb = tron_v1::Block { + number: h, + id: vec![h as u8; 32], + parent_id: vec![(h - 1) as u8; 32], + time: None, + header: None, + transactions: vec![], + finality: Some(tron_v1::Finality { + tier: tron_v1::FinalityTier::Finalized as i32, + solidified_head_number: 200, + }), + }; + archive + .put(h, BlockId([h as u8; 32]), &pb.encode_to_vec()) + .unwrap(); + } + let cache = lightcycle_store::new_shared::(16); + { + let mut g = cache.write().await; + for h in 110..113u64 { + let buffered = BufferedBlock { + height: h, + block_id: BlockId([h as u8; 32]), + parent_id: BlockId([(h - 1) as u8; 32]), + fork_id: 0, + decoded: DecodedBlock { + header: DecodedHeader { + height: h, + block_id: BlockId([h as u8; 32]), + parent_id: BlockId([(h - 1) as u8; 32]), + raw_data_hash: [0u8; 32], + tx_trie_root: [0u8; 32], + timestamp_ms: 0, + witness: Address([0x41; 21]), + witness_signature: vec![], + version: 34, + }, + transactions: vec![], + }, + tx_infos: vec![], + }; + g.insert(h, buffered.block_id, buffered); + } + } + + let hub = Hub::new(64); + let (_head_tx, head_rx) = + tokio::sync::watch::channel::>(None); + let backfill = lightcycle_firehose::StreamBackfill { + cache, + solidified_head: head_rx, + archive: Some(archive), + }; + let addr = pick_addr().await; + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server = tokio::spawn({ + let hub = hub.clone(); + async move { + let _ = serve( + addr, + hub, + empty_oracle(), + "tron-test", + Some(backfill), + async move { + let _ = shutdown_rx.await; + }, + ) + .await; + } + }); + + let mut client = None; + for _ in 0..40 { + if let Ok(c) = StreamClient::connect(format!("http://{addr}")).await { + client = Some(c); + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let mut client = client.expect("server didn't accept connections"); + + // Resume from 50 — both below archive (100) and cache (110). + // The current implementation walks the archive starting at 50; + // the archive returns no rows for [50, 99] (range scan finds + // nothing); then the cache walker discovers cache_walk_from=50 + // is below cache.min_height=110 and returns FailedPrecondition. + let err = client + .blocks(Request { + start_block_num: 50, + cursor: String::new(), + stop_block_num: 0, + final_blocks_only: false, + transforms: vec![], + }) + .await + .expect_err("expected FailedPrecondition"); + assert_eq!(err.code(), tonic::Code::FailedPrecondition); + + let _ = shutdown_tx.send(()); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; +} diff --git a/crates/lightcycle-store/Cargo.toml b/crates/lightcycle-store/Cargo.toml index ab670ab..a55e29c 100644 --- a/crates/lightcycle-store/Cargo.toml +++ b/crates/lightcycle-store/Cargo.toml @@ -11,7 +11,6 @@ lightcycle-types.workspace = true redb.workspace = true serde = { workspace = true } -bincode.workspace = true metrics.workspace = true thiserror.workspace = true tokio = { workspace = true } diff --git a/crates/lightcycle-store/src/archive.rs b/crates/lightcycle-store/src/archive.rs new file mode 100644 index 0000000..0d10f16 --- /dev/null +++ b/crates/lightcycle-store/src/archive.rs @@ -0,0 +1,502 @@ +//! Persistent block archive — the durable counterpart to [`BlockCache`]. +//! +//! ## What it solves +//! +//! [`BlockCache`] holds the last N blocks in memory. Past N (default +//! ~1h of mainnet at 3-second slot times), evicted blocks are gone — a +//! consumer that disconnects for longer than the in-memory window +//! cannot resume via `Stream.Blocks` backfill. +//! +//! `BlockArchive` is the redb-backed durable layer that catches what +//! the cache evicts (or what the relayer explicitly archives on +//! `Output::Irreversible`). Backfill order from the firehose: +//! +//! 1. Walk in-memory cache forward from the consumer's resume height. +//! 2. If the resume height is below `cache.min_height`, walk the +//! archive forward until catching the cache's lower bound, then +//! chain into the cache walk. +//! 3. If the resume height is below `archive.min_height`, the +//! `Stream.Blocks` call returns `FailedPrecondition` — backfill is +//! not available beyond the operator-configured retention window. +//! +//! ## Schema +//! +//! Single redb table `blocks` mapping `u64` (block height) to +//! `Vec` of the form `[block_id: 32B][payload: ...]`. Payload is +//! opaque to this layer — the firehose stores `pb::Block`-encoded +//! bytes there, but the archive doesn't know or care. +//! +//! Why a single table with a packed value rather than two tables (one +//! for `block_id`, one for `payload`): redb commits are per-transaction, +//! so two-table puts cost an extra tx or force the caller into open +//! transaction lifecycles. A 32-byte prefix on every value is a +//! 0.05% overhead at typical TRON block sizes (~50 KB) and avoids +//! both complexities. +//! +//! ## What the archive is NOT +//! +//! - **Not a generic kv store.** Only height-keyed reads are +//! supported. A by-id index would double the on-disk footprint and +//! is solvable upstream by the cache (which is height + id) for +//! the recent window, plus the chain itself for ancient blocks. +//! - **Not a cross-replica consistency primitive.** Per ADR-0021, +//! only chain finality is a legal cross-replica truth. The archive +//! is a per-replica retention layer; replicas MUST NOT reconcile +//! their archives against each other. Each replica archives the +//! blocks its own broadcast saw cross the irreversible threshold. +//! - **Not the engine's reorg-buffer.** Only blocks past the chain's +//! solidified-head transition land here. By construction these +//! never get UNDO'd, so the archive is append-only on the happy path. +//! +//! ## Crash semantics +//! +//! Same as [`crate::CursorStore`]: redb is a single-file ACID store, +//! writes are durable on commit, a crash mid-write rolls back to the +//! previous committed state. The archive writer commits per put; +//! callers that want batched writes should call `put_batch`. + +use std::path::Path; +use std::sync::Arc; + +use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition}; +use thiserror::Error; + +use lightcycle_types::{BlockHeight, BlockId}; + +const BLOCKS_TABLE: TableDefinition = TableDefinition::new("blocks"); + +/// Errors surfaced by the block archive. Boxes the redb error variants +/// for the same reason [`crate::CursorStoreError`] does — keeps +/// `Result` small enough to not bloat call frames. +#[derive(Debug, Error)] +pub enum ArchiveError { + #[error("redb open error: {0}")] + Open(#[source] Box), + #[error("redb transaction error: {0}")] + Transaction(#[source] Box), + #[error("redb table error: {0}")] + Table(#[source] Box), + #[error("redb storage error: {0}")] + Storage(#[source] Box), + #[error("redb commit error: {0}")] + Commit(#[source] Box), + #[error( + "archive value at height {height} is shorter than 32-byte block id prefix ({len} bytes)" + )] + Truncated { height: BlockHeight, len: usize }, +} + +pub(crate) type Result = std::result::Result; + +impl From for ArchiveError { + fn from(e: redb::DatabaseError) -> Self { + Self::Open(Box::new(e)) + } +} +impl From for ArchiveError { + fn from(e: redb::TransactionError) -> Self { + Self::Transaction(Box::new(e)) + } +} +impl From for ArchiveError { + fn from(e: redb::TableError) -> Self { + Self::Table(Box::new(e)) + } +} +impl From for ArchiveError { + fn from(e: redb::StorageError) -> Self { + Self::Storage(Box::new(e)) + } +} +impl From for ArchiveError { + fn from(e: redb::CommitError) -> Self { + Self::Commit(Box::new(e)) + } +} + +/// Persistent block archive. Cheap to clone — wraps the inner +/// `Database` in an `Arc` so a single open handle can be shared across +/// the archiver task (writer) and the firehose backfill walker (reader). +/// redb itself handles internal locking so concurrent reads + a single +/// writer Just Work. +#[derive(Debug, Clone)] +pub struct BlockArchive { + db: Arc, +} + +impl BlockArchive { + /// Open or create the archive at `path`. Existing data is + /// preserved across opens; the file is created with the table + /// initialized so subsequent reads don't surface "table not found." + pub fn open(path: impl AsRef) -> Result { + let db = Database::create(path)?; + let txn = db.begin_write()?; + { + let _ = txn.open_table(BLOCKS_TABLE)?; + } + txn.commit()?; + Ok(Self { db: Arc::new(db) }) + } + + /// Put one block. `payload` is opaque to this layer; the firehose + /// stores `pb::Block`-encoded bytes here. Overwrites any prior + /// entry at `height` (the relayer drives writes only for finalized + /// blocks, so a same-height collision implies the prior write was + /// for the same block — idempotent). + pub fn put(&self, height: BlockHeight, block_id: BlockId, payload: &[u8]) -> Result<()> { + let txn = self.db.begin_write()?; + { + let mut table = txn.open_table(BLOCKS_TABLE)?; + let value = pack_value(block_id, payload); + table.insert(height, value.as_slice())?; + } + txn.commit()?; + metrics::counter!("lightcycle_store_archive_writes_total").increment(1); + Ok(()) + } + + /// Atomically write a batch. One commit per call instead of one + /// per block; useful when the archiver task is catching up after + /// downtime. Empty batch is a no-op. + pub fn put_batch(&self, items: &[(BlockHeight, BlockId, Vec)]) -> Result<()> { + if items.is_empty() { + return Ok(()); + } + let txn = self.db.begin_write()?; + { + let mut table = txn.open_table(BLOCKS_TABLE)?; + for (height, block_id, payload) in items { + let value = pack_value(*block_id, payload); + table.insert(*height, value.as_slice())?; + } + } + txn.commit()?; + metrics::counter!("lightcycle_store_archive_writes_total").increment(items.len() as u64); + Ok(()) + } + + /// Look up a single block by height. `Ok(None)` is "not in the + /// archive" (either never written, or below the retention floor). + pub fn get(&self, height: BlockHeight) -> Result)>> { + let txn = self.db.begin_read()?; + let table = txn.open_table(BLOCKS_TABLE)?; + let Some(v) = table.get(height)? else { + return Ok(None); + }; + let bytes = v.value().to_vec(); + let (id, payload) = unpack_value(height, &bytes)?; + Ok(Some((id, payload))) + } + + /// Inclusive range scan, returned in ascending height order. The + /// firehose backfill walker uses this to materialize the + /// pre-cache portion of the resume window in one shot. + /// + /// Bounded by `limit`; callers should pick a value that fits in + /// memory comfortably (a few thousand TRON blocks is well under + /// 1 GB at typical sizes). + pub fn range( + &self, + start: BlockHeight, + end_inclusive: BlockHeight, + limit: usize, + ) -> Result)>> { + if start > end_inclusive || limit == 0 { + return Ok(Vec::new()); + } + let txn = self.db.begin_read()?; + let table = txn.open_table(BLOCKS_TABLE)?; + let mut out = Vec::new(); + let iter = table.range(start..=end_inclusive)?; + for entry in iter { + let (k, v) = entry?; + let height = k.value(); + let bytes = v.value(); + let (id, payload) = unpack_value(height, bytes)?; + out.push((height, id, payload)); + if out.len() >= limit { + break; + } + } + Ok(out) + } + + /// Drop every entry strictly below `floor`. Returns the number of + /// rows removed. Used by the retention policy task — operator + /// configures "keep last N days of archive," the task computes the + /// floor block height once a minute and calls this. + pub fn delete_below(&self, floor: BlockHeight) -> Result { + let txn = self.db.begin_write()?; + let removed; + { + let mut table = txn.open_table(BLOCKS_TABLE)?; + let to_remove: Vec = { + let iter = table.range(..floor)?; + let mut keys = Vec::new(); + for entry in iter { + let (k, _) = entry?; + keys.push(k.value()); + } + keys + }; + removed = to_remove.len(); + for k in to_remove { + table.remove(k)?; + } + } + txn.commit()?; + if removed > 0 { + metrics::counter!("lightcycle_store_archive_deletes_total").increment(removed as u64); + } + Ok(removed) + } + + /// Lowest archived height, or `None` if the archive is empty. + /// Used by the firehose backfill walker to short-circuit a request + /// for a height below the retention floor with `FailedPrecondition`. + pub fn min_height(&self) -> Result> { + let txn = self.db.begin_read()?; + let table = txn.open_table(BLOCKS_TABLE)?; + let out = { + let mut iter = table.iter()?; + match iter.next() { + Some(entry) => Some(entry?.0.value()), + None => None, + } + }; + Ok(out) + } + + /// Highest archived height, or `None` if the archive is empty. + pub fn max_height(&self) -> Result> { + let txn = self.db.begin_read()?; + let table = txn.open_table(BLOCKS_TABLE)?; + let out = { + let iter = table.iter()?; + let mut last: Option = None; + for entry in iter { + last = Some(entry?.0.value()); + } + last + }; + Ok(out) + } + + /// Number of archived blocks. O(table scan) — diagnostics only. + pub fn len(&self) -> Result { + let txn = self.db.begin_read()?; + let table = txn.open_table(BLOCKS_TABLE)?; + Ok(table.len()? as usize) + } + + /// Convenience for `len()? == 0`. + pub fn is_empty(&self) -> Result { + Ok(self.len()? == 0) + } +} + +fn pack_value(block_id: BlockId, payload: &[u8]) -> Vec { + let mut v = Vec::with_capacity(32 + payload.len()); + v.extend_from_slice(&block_id.0); + v.extend_from_slice(payload); + v +} + +fn unpack_value(height: BlockHeight, bytes: &[u8]) -> Result<(BlockId, Vec)> { + if bytes.len() < 32 { + return Err(ArchiveError::Truncated { + height, + len: bytes.len(), + }); + } + let mut id = [0u8; 32]; + id.copy_from_slice(&bytes[..32]); + let payload = bytes[32..].to_vec(); + Ok((BlockId(id), payload)) +} + +/// Describe the archive metrics so they show up in Prometheus output +/// before the first write. +pub fn describe_archive_metrics() { + metrics::describe_counter!( + "lightcycle_store_archive_writes_total", + "Block archive put operations (one per finalized block written by the archiver task)." + ); + metrics::describe_counter!( + "lightcycle_store_archive_deletes_total", + "Block archive delete operations (retention-policy pruning)." + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + fn id(byte: u8) -> BlockId { + BlockId([byte; 32]) + } + + #[test] + fn put_get_round_trip() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + arc.put(100, id(0xa), b"hello").unwrap(); + let got = arc.get(100).unwrap(); + assert_eq!(got, Some((id(0xa), b"hello".to_vec()))); + } + + #[test] + fn get_missing_returns_none() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + assert_eq!(arc.get(999).unwrap(), None); + } + + #[test] + fn put_overwrites_prior_entry() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + arc.put(100, id(0xa), b"v1").unwrap(); + arc.put(100, id(0xa), b"v2").unwrap(); + let (_, payload) = arc.get(100).unwrap().unwrap(); + assert_eq!(payload, b"v2"); + assert_eq!(arc.len().unwrap(), 1); + } + + #[test] + fn put_batch_commits_atomically() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + let batch: Vec<_> = (100..=104) + .map(|h| (h, id(h as u8), format!("blk{h}").into_bytes())) + .collect(); + arc.put_batch(&batch).unwrap(); + assert_eq!(arc.len().unwrap(), 5); + let (_, p) = arc.get(102).unwrap().unwrap(); + assert_eq!(p, b"blk102"); + } + + #[test] + fn put_batch_empty_is_noop() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + arc.put_batch(&[]).unwrap(); + assert!(arc.is_empty().unwrap()); + } + + #[test] + fn range_returns_inclusive_ascending() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + for h in 100..=110 { + arc.put(h, id(h as u8), format!("blk{h}").as_bytes()) + .unwrap(); + } + let rows = arc.range(102, 105, 100).unwrap(); + assert_eq!(rows.len(), 4); + let heights: Vec<_> = rows.iter().map(|r| r.0).collect(); + assert_eq!(heights, vec![102, 103, 104, 105]); + } + + #[test] + fn range_respects_limit() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + for h in 100..=110 { + arc.put(h, id(h as u8), b"x").unwrap(); + } + let rows = arc.range(100, 110, 3).unwrap(); + assert_eq!(rows.len(), 3); + assert_eq!(rows[0].0, 100); + assert_eq!(rows[2].0, 102); + } + + #[test] + fn range_with_start_after_end_returns_empty() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + arc.put(100, id(0xa), b"x").unwrap(); + let rows = arc.range(101, 100, 100).unwrap(); + assert!(rows.is_empty()); + } + + #[test] + fn delete_below_drops_strictly_lower_heights() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + for h in 100..=104 { + arc.put(h, id(h as u8), b"x").unwrap(); + } + let removed = arc.delete_below(102).unwrap(); + assert_eq!(removed, 2); + assert_eq!(arc.len().unwrap(), 3); + assert_eq!(arc.get(101).unwrap(), None); + assert!(arc.get(102).unwrap().is_some()); + } + + #[test] + fn min_max_track_inserts_and_deletes() { + let dir = tempdir().unwrap(); + let arc = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + assert_eq!(arc.min_height().unwrap(), None); + assert_eq!(arc.max_height().unwrap(), None); + arc.put(100, id(1), b"x").unwrap(); + arc.put(105, id(5), b"x").unwrap(); + arc.put(110, id(10), b"x").unwrap(); + assert_eq!(arc.min_height().unwrap(), Some(100)); + assert_eq!(arc.max_height().unwrap(), Some(110)); + arc.delete_below(106).unwrap(); + assert_eq!(arc.min_height().unwrap(), Some(110)); + assert_eq!(arc.max_height().unwrap(), Some(110)); + } + + #[test] + fn data_persists_across_reopen() { + let dir = tempdir().unwrap(); + let path = dir.path().join("a.redb"); + { + let arc = BlockArchive::open(&path).unwrap(); + arc.put(100, id(0xa), b"durable").unwrap(); + } + let arc = BlockArchive::open(&path).unwrap(); + let (got_id, payload) = arc.get(100).unwrap().unwrap(); + assert_eq!(got_id, id(0xa)); + assert_eq!(payload, b"durable"); + } + + #[test] + fn cloned_handles_share_one_database() { + let dir = tempdir().unwrap(); + let a = BlockArchive::open(dir.path().join("a.redb")).unwrap(); + let b = a.clone(); + a.put(100, id(0xa), b"shared").unwrap(); + assert!(b.get(100).unwrap().is_some()); + } + + #[test] + fn truncated_value_surfaces_typed_error() { + // Direct redb path: write a too-short value and observe the + // unpack error. Guards against silent data corruption. + let dir = tempdir().unwrap(); + let path = dir.path().join("a.redb"); + { + let db = Database::create(&path).unwrap(); + let txn = db.begin_write().unwrap(); + { + let mut table = txn.open_table(BLOCKS_TABLE).unwrap(); + table.insert(100u64, b"short".as_slice()).unwrap(); + } + txn.commit().unwrap(); + } + let arc = BlockArchive::open(&path).unwrap(); + let err = arc.get(100).unwrap_err(); + match err { + ArchiveError::Truncated { height, len } => { + assert_eq!(height, 100); + assert_eq!(len, 5); + } + other => panic!("expected Truncated, got {other:?}"), + } + } +} diff --git a/crates/lightcycle-store/src/lib.rs b/crates/lightcycle-store/src/lib.rs index 070ac1d..39519ed 100644 --- a/crates/lightcycle-store/src/lib.rs +++ b/crates/lightcycle-store/src/lib.rs @@ -71,10 +71,12 @@ //! diffs of the active witness set. Cold restarts re-derive from the //! nearest checkpoint instead of replaying from genesis. Not yet wired. +mod archive; mod cache; mod consistency; mod cursor_store; +pub use archive::{describe_archive_metrics, ArchiveError, BlockArchive}; pub use cache::{describe_cache_metrics, new_shared, BlockCache, SharedBlockCache}; pub use consistency::{ describe_metrics, ConsistencyHorizonObserver, ConsistencySource, FinalityFromChain, diff --git a/deny.toml b/deny.toml index 1b3cc58..fc9fc74 100644 --- a/deny.toml +++ b/deny.toml @@ -2,16 +2,7 @@ db-path = "~/.cargo/advisory-db" db-urls = ["https://github.com/rustsec/advisory-db"] yanked = "deny" -ignore = [ - # bincode 1.x — the team announced a permanent end of maintenance - # in 2025 (doxxing/harassment incident, not a security flaw). 1.3.3 - # is what the team itself called a "complete version not in need of - # updates." We use bincode only via the persistence layer in - # lightcycle-store; replacing it (postcard / bitcode / rkyv) is on - # the roadmap but not load-bearing while the crate is still a stub. - # Re-evaluate when lightcycle-store grows real persistence. - "RUSTSEC-2025-0141", -] +ignore = [] [licenses] allow = [