diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 65d9ecc..22c3897 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,7 +33,7 @@ jobs: libslirp-dev - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable + uses: dtolnay/rust-toolchain@1.91.1 with: toolchain: ${{ env.RUST_TOOLCHAIN }} components: rustfmt, clippy @@ -55,6 +55,9 @@ jobs: - name: Clippy run: cargo clippy --workspace --all-targets --all-features --locked -- -D warnings + - name: Watchdog Lua tests + run: lua watchdog/tests/run.lua + - name: Test timeout-minutes: 15 run: cargo test --workspace --all-targets --all-features --locked diff --git a/.gitignore b/.gitignore index 0359111..e84d291 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,14 @@ /target +.deps/ +watchdog-e2e-*/ .env .env.fish sequencer.db sequencer.db-shm sequencer.db-wal /out/ +examples/canonical-app/out/ /.DS_Store +.vscode/ soljson-latest.js **/states/ diff --git a/Cargo.lock b/Cargo.lock index 1156d1b..3e3f45e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3566,6 +3566,8 @@ dependencies = [ "rollups-harness", "sequencer-core", "sequencer-rust-client", + "serde_json", + "tempfile", "tokio", ] diff --git a/docs/watchdog/README.md b/docs/watchdog/README.md new file mode 100644 index 0000000..8fd44d6 --- /dev/null +++ b/docs/watchdog/README.md @@ -0,0 +1,175 @@ +# Watchdog + +The watchdog is an off-chain safety process that compares sequencer API state +against state produced by the canonical Cartesi Machine at an L1 safe block. + +## V1 Shape + +The implementation lives in `watchdog/` and is intentionally split into small +Lua modules: + +- `http.lua`: HTTP adapter (`lua-curl` / `lcurl` when installed, otherwise `curl` CLI via `new_auto()`). +- `jsonrpc.lua`: JSON-RPC request/response validation. +- `l1.lua`: partitioned `eth_getLogs` scanning and strict L1 log ordering. +- `abi.lua`: decoding for the `InputAdded` / `EvmAdvance` envelope. +- `machine.lua`: narrow adapter boundary for Cartesi Machine bindings. +- `machine_cli.lua`: `cartesi-machine` CLI adapter for loading snapshot + directories, writing raw input files, advancing, inspecting, and saving snapshots. +- `compare.lua`: raw byte comparison. +- `checkpoint.lua`: manifest-backed checkpoint persistence. +- `alarm.lua`: webhook alarm delivery. +- `retry.lua`: bounded retry helper used by the runtime. +- `runner.lua`: one-shot orchestration across checkpoint load, sequencer poll, + L1 fetch, CM replay, raw compare, alarm, and checkpoint write. +- `main.lua`: compare or advance loop (daemon or `WATCHDOG_ONCE=1`). + +The L1 reader follows the Rust partition strategy from +`sequencer/src/partition.rs`: if an RPC provider rejects a large range, the +range is split recursively and retried. Lua decodes and validates input +envelopes, but it does not classify payload tags. Direct input vs batch +submission remains scheduler logic inside the canonical machine. + +`l1.lua` has the `InputAdded(address,uint256,bytes)` event topic baked in and +filters logs by `topic0 = InputAdded` and `topic1 = app address`, matching the +Rust reader's app-filtered InputBox scan. + +## Runtime Contract + +The sequencer exposes `GET /get_state` for byte-exact state comparison. The +endpoint is generic over app state bytes, even though the toy wallet app +currently returns deterministic JSON: + +```json +{ + "safe_block": 123, + "state": "{\"balances\":{},\"nonces\":{}}" +} +``` + +`state` must be the exact bytes produced by the bare-metal app serializer +for the app state anchored at `safe_block`. The watchdog compares those raw +bytes with the bytes returned by CM inspect. It must not canonicalize both +values before deciding pass/fail. + +`get_state` reconstructs a safe-only app state by replaying the persisted +scheduler-accepted safe batch prefix into a fresh app instance. It intentionally +excludes the current soft-confirmed Tip and any valid closed batches that have +not been accepted by the L1 scheduler view yet. + +The canonical scheduler answers `RollupRequest::Inspect` with query `state` by +calling `Application::export_state()` (see `examples/canonical-app`). + +## Checkpoints + +V1 persists only the resulting Cartesi Machine checkpoint, not the fetched L1 +inputs. + +```text +checkpoint_dir/ + current.json + checkpoints/ + 00000000000001234567/ + snapshot/ + manifest.json +``` + +`manifest.json` records `safe_block`, timestamp, and optionally the CM image +hash. A new checkpoint directory is written first, then `current.json` is +atomically replaced to point at it. + +When bootstrapping without an existing checkpoint, the operator provides both: + +- `WATCHDOG_CM_SNAPSHOT_DIR` +- `WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK` + +## Modes + +The default `WATCHDOG_MODE` is `advance`. In this mode the watchdog does not +poll the sequencer. It: + +1. Loads the latest checkpoint, or the bootstrap snapshot directory. +2. Reads the L1 safe block from the RPC (or `WATCHDOG_TARGET_SAFE_BLOCK` when + provided for tests/manual runs). +3. Fetches and decodes `InputAdded` logs for the block range. +4. Feeds the raw InputBox input bytes into the CM adapter. +5. Saves a new snapshot directory and advances `current.json`. + +`WATCHDOG_MODE=compare` replays safe L1 inputs into the CM, calls +`--cmio-inspect-state` with the `state` query, and compares the returned report +bytes against `GET /get_state`. + +Useful runtime knobs: + +- `WATCHDOG_CM_EXECUTABLE`: Cartesi Machine executable, default `cartesi-machine`. +- `WATCHDOG_CM_WORK_DIR`: temporary directory for staged input files, default `/tmp`. +- `WATCHDOG_RETRY_ATTEMPTS`: bounded retry attempts per run, default `3`. +- `WATCHDOG_RETRY_DELAY_SEC`: delay between retry attempts, default `5`. +- `WATCHDOG_TARGET_SAFE_BLOCK`: manual/test override for the target safe block. + +## Local Tests + +| Command | What it exercises | +|---------|-------------------| +| `just test-watchdog` | Lua unit tests (fake HTTP/RPC/CM; no live chain) | +| `just test-watchdog-e2e` | Real CM: advance, inspect; optional live compare if `WATCHDOG_E2E_SEQUENCER_URL` set | +| `just test-watchdog-compare-harness` | **Full E2E**: Anvil + devnet sequencer + `GET /get_state` + CM inspect + Lua compare | +| `just test-watchdog-webhook-drill` | Webhook delivery smoke (`WATCHDOG_WEBHOOK_URL` required) | + +Prerequisites for CM-backed tests: + +```bash +just canonical-build-machine-image # once, if out/ image is missing +just watchdog-lua-deps # lua-cjson into .deps/lua (system pkg or gcc) +``` + +`cartesi-machine`, `lua`, and `curl` on PATH. `lua-curl` is optional (CLI fallback). + +### Lua unit tests + +```bash +just test-watchdog +``` + +Covers raw comparison, golden InputAdded ABI decoding, L1 ordering, recursive +range partitioning, config, checkpoints, advance/compare runner (fakes), CM CLI +staging, retry, and alarm webhook encoding. + +### Lua CM end-to-end + +```bash +just test-watchdog-e2e +``` + +Scenarios (verbose `step NN/NN` logging): + +- `prerequisites` — `cartesi-machine` on PATH and machine image present. +- `advance-empty-range` — real CM advance + checkpoint write with zero new inputs. +- `cm-inspect-state-query` — real `--cmio-inspect-state` with query `state`. +- `compare-runner-with-sequencer` — skipped unless `WATCHDOG_E2E_SEQUENCER_URL` is set. + +Rebuild the machine image after changing the canonical scheduler/dapp. A stale +image makes `cm-inspect-state-query` skip with `inspect endpoint not implemented`. + +### Rust compare harness (most complete integration test) + +```bash +just test-watchdog-compare-harness +``` + +Spawns Anvil + rollups devnet + `sequencer-devnet`, proves CM inspect JSON at +genesis, then runs `watchdog/tests/run_compare_once.lua` in compare mode with +matching `WATCHDOG_*` addresses. Requires `RUN_WATCHDOG_E2E=1` (set by the recipe). + +### Staging / operator drills + +See [`staging-drills.md`](staging-drills.md) for webhook smoke, synthetic +divergence POST, and manual compare env vars. + +## Related sequencer tests + +```bash +cargo test -p sequencer get_state -- --test-threads=1 +``` + +HTTP integration for `GET /get_state` lives in `sequencer/tests/e2e_sequencer.rs`. +Storage/replay semantics are covered in `sequencer/src/egress/app_state.rs` unit tests. diff --git a/docs/watchdog/staging-drills.md b/docs/watchdog/staging-drills.md new file mode 100644 index 0000000..1ca7168 --- /dev/null +++ b/docs/watchdog/staging-drills.md @@ -0,0 +1,95 @@ +# Watchdog Staging Drills + +Operator drills for webhook delivery and divergence detection. Local harness +steps live in [`README.md`](README.md); this document covers staging and manual +verification. + +## Prerequisites + +- Built canonical machine image: `just canonical-build-machine-image` +- `cartesi-machine`, `lua`, and `curl` on PATH +- `lua-cjson` (system package, or `just watchdog-lua-deps` copies/builds `.deps/lua/cjson.so` via `gcc` — no `make`) +- `lua-curl` optional — drills and compare harness fall back to `curl` CLI when absent +- Staging or local sequencer reachable at `WATCHDOG_SEQUENCER_URL` +- L1 RPC + InputBox + app addresses matching that deployment +- Webhook receiver URL (Slack incoming webhook, PagerDuty, or `https://httpbin.org/post` for smoke tests) + +## Drill 1 — Webhook delivery (no sequencer) + +Verifies the alarm transport reaches your receiver. + +```bash +just watchdog-lua-deps +export WATCHDOG_WEBHOOK_URL="https://your-receiver.example/hook" +WATCHDOG_LUA_DEPS=.deps/lua lua watchdog/tests/drill_webhook.lua +# or: just test-watchdog-webhook-drill +``` + +Expected: HTTP 2xx for both `state_mismatch` and `safe_block_regressed` sample payloads. +Check the receiver shows JSON with `"kind"` and `"run_id"` fields. + +## Drill 2 — Divergence webhook (synthetic mismatch, no CM) + +Verifies the receiver gets a realistic `state_mismatch` payload without running compare mode: + +```bash +export WATCHDOG_WEBHOOK_URL="https://your-receiver.example/hook" +WATCHDOG_LUA_DEPS=.deps/lua lua watchdog/tests/drill_divergence.lua +``` + +Expected: HTTP 2xx, receiver shows `kind=state_mismatch` and a non-zero `mismatch_offset`. + +Unit coverage: `just test-watchdog` (`runner alarms on raw state mismatch`). + +## Drill 3 — Happy compare (local Anvil harness) + +Full stack: Anvil + devnet rollups + sequencer + CM inspect + `GET /get_state`. + +```bash +just test-watchdog-compare-harness +# equivalent: +# just setup && just watchdog-lua-deps && just ensure-machine-image +# cargo build -p sequencer --bin sequencer-devnet -p rollups-e2e +# RUN_WATCHDOG_E2E=1 cargo run -p rollups-e2e -- watchdog_genesis_compare_test --exact +``` + +Or run the Lua compare pass manually after starting a devnet sequencer yourself: + +```bash +export WATCHDOG_MODE=compare +export WATCHDOG_SEQUENCER_URL=http://127.0.0.1: +export WATCHDOG_L1_RPC_URL=http://127.0.0.1:8545 +export WATCHDOG_INPUTBOX_ADDRESS= +export WATCHDOG_APP_ADDRESS= +export WATCHDOG_CHECKPOINT_DIR=/tmp/watchdog-checkpoints +export WATCHDOG_CM_SNAPSHOT_DIR=examples/canonical-app/out/canonical-machine-image +export WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK=0 +export WATCHDOG_LUA_DEPS=.deps/lua +lua watchdog/tests/run_compare_once.lua +``` + +Expected: exit 0, stdout `watchdog compare ok: safe_block=... input_count=...`, and genesis wallet state `{"balances":{},"nonces":{}}` on both sides. + +## Drill 4 — Production compare daemon + +Run the watchdog in compare mode against staging (daemon or cron): + +```bash +export WATCHDOG_MODE=compare +export WATCHDOG_ONCE=1 # or 0 for daemon +export WATCHDOG_WEBHOOK_URL=... +# ... all WATCHDOG_* vars from config.lua ... +lua watchdog/main.lua +``` + +On mismatch: non-zero exit, webhook fired, logs show `state_mismatch` and byte offset. + +## Triage checklist + +| Symptom | Likely cause | +|---------|----------------| +| `inspect endpoint not implemented` | Stale CM image — rebuild | +| `state_mismatch` at genesis | Checkpoint not aligned with sequencer history | +| Webhook 4xx | Wrong URL or auth on receiver | +| Compare skipped in Lua e2e | Set `WATCHDOG_E2E_SEQUENCER_URL` to a live sequencer | +| Compare harness skipped | Set `RUN_WATCHDOG_E2E=1` (see `just test-watchdog-compare-harness`) | diff --git a/examples/app-core/src/application/wallet.rs b/examples/app-core/src/application/wallet.rs index d4f5f55..013da7a 100644 --- a/examples/app-core/src/application/wallet.rs +++ b/examples/app-core/src/application/wallet.rs @@ -48,7 +48,7 @@ impl Default for WalletConfig { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct WalletApp { config: WalletConfig, balances: HashMap, @@ -112,6 +112,39 @@ impl WalletApp { Erc20Deposit::decode(&input.payload).map(Some) } + + fn state_json(&self) -> String { + let mut balances: Vec<_> = self + .balances + .iter() + .filter(|(_, balance)| **balance != U256::ZERO) + .collect(); + balances.sort_by_key(|(address, _)| address.as_slice()); + + let mut nonces: Vec<_> = self + .nonces + .iter() + .filter(|(_, nonce)| **nonce != 0) + .collect(); + nonces.sort_by_key(|(address, _)| address.as_slice()); + + let balance_entries = balances + .into_iter() + .map(|(address, balance)| format!("\"{}\":\"{balance}\"", json_address(address))) + .collect::>() + .join(","); + let nonce_entries = nonces + .into_iter() + .map(|(address, nonce)| format!("\"{}\":{nonce}", json_address(address))) + .collect::>() + .join(","); + + format!("{{\"balances\":{{{balance_entries}}},\"nonces\":{{{nonce_entries}}}}}") + } +} + +fn json_address(address: &Address) -> String { + format!("0x{}", alloy_primitives::hex::encode(address.as_slice())) } impl Default for WalletApp { @@ -253,6 +286,10 @@ impl Application for WalletApp { fn executed_input_count(&self) -> u64 { self.executed_input_count } + + fn export_state(&self) -> Result { + Ok(self.state_json()) + } } #[cfg(test)] @@ -645,4 +682,30 @@ mod tests { // Fee goes to address zero — effectively burned. assert_eq!(app.current_user_balance(Address::ZERO), gas_cost); } + + #[test] + fn export_state_is_deterministic_and_omits_defaults() { + let mut app = WalletApp::new(WalletConfig::default()); + let high = address!("0xffffffffffffffffffffffffffffffffffffffff"); + let low = address!("0x1111111111111111111111111111111111111111"); + app.balances.insert(high, U256::from(20_u64)); + app.balances.insert(low, U256::from(10_u64)); + app.balances.insert(Address::ZERO, U256::ZERO); + app.nonces.insert(high, 2); + app.nonces.insert(low, 1); + app.nonces.insert(Address::ZERO, 0); + + assert_eq!( + app.export_state().expect("export state"), + concat!( + "{\"balances\":{", + "\"0x1111111111111111111111111111111111111111\":\"10\",", + "\"0xffffffffffffffffffffffffffffffffffffffff\":\"20\"", + "},\"nonces\":{", + "\"0x1111111111111111111111111111111111111111\":1,", + "\"0xffffffffffffffffffffffffffffffffffffffff\":2", + "}}" + ) + ); + } } diff --git a/examples/canonical-app/justfile b/examples/canonical-app/justfile index f3eb915..5c5ecb6 100644 --- a/examples/canonical-app/justfile +++ b/examples/canonical-app/justfile @@ -35,13 +35,13 @@ build-dapp: build-dapp-devnet build-dapp-devnet: mkdir -p {{out_dir}} - SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-devnet --target riscv64gc-unknown-linux-musl --release + SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CARGO_TARGET_DIR=../../target CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-devnet --target riscv64gc-unknown-linux-musl --release cp ../../target/riscv64gc-unknown-linux-musl/release/canonical-app-devnet {{dapp_binary_devnet}} cp {{dapp_binary_devnet}} {{dapp_binary}} build-dapp-sepolia: mkdir -p {{out_dir}} - SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-sepolia --target riscv64gc-unknown-linux-musl --release + SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CARGO_TARGET_DIR=../../target CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-sepolia --target riscv64gc-unknown-linux-musl --release cp ../../target/riscv64gc-unknown-linux-musl/release/canonical-app-sepolia {{dapp_binary_sepolia}} cp {{dapp_binary_sepolia}} {{dapp_binary}} diff --git a/examples/canonical-app/src/scheduler/core.rs b/examples/canonical-app/src/scheduler/core.rs index 90b49bd..96a837d 100644 --- a/examples/canonical-app/src/scheduler/core.rs +++ b/examples/canonical-app/src/scheduler/core.rs @@ -87,6 +87,15 @@ impl PartialEq for ProcessOutcome { } } +/// Inspect query accepted by the scheduler's state export endpoint. +pub const STATE_INSPECT_QUERY: &[u8] = b"state"; + +#[derive(Debug, PartialEq, Eq)] +pub(super) enum InspectError { + UnsupportedQuery, + Application(String), +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) enum BatchRejectReason { DecodeFailed, @@ -130,6 +139,17 @@ impl Scheduler { self.next_expected_batch_nonce } + pub(super) fn inspect_state(&self, query: &[u8]) -> Result, InspectError> { + if !query.is_empty() && query != STATE_INSPECT_QUERY { + return Err(InspectError::UnsupportedQuery); + } + + self.app + .export_state() + .map(|state| state.into_bytes()) + .map_err(|err| InspectError::Application(err.to_string())) + } + pub(super) fn process_input(&mut self, input: SchedulerInput) -> ProcessResult { // Execute overdue directs before any input to keep backstop semantics explicit. let mut outputs = Vec::new(); @@ -460,6 +480,10 @@ mod tests { self.executed.push(RecordedTx::Direct(marker)); Ok(Vec::new()) } + + fn export_state(&self) -> Result { + Ok(format!("events:{}", self.executed.len())) + } } const SEQUENCER: Address = address!("0x1111111111111111111111111111111111111111"); @@ -983,6 +1007,60 @@ mod tests { assert_eq!(scheduler.app.events(), [RecordedTx::UserOp(9)]); } + #[test] + fn inspect_exports_application_state_for_state_query() { + let mut scheduler = Scheduler::new( + RecordingApp::default(), + SchedulerConfig { + sequencer_address: SEQUENCER, + max_wait_blocks: 100, + }, + ); + assert_eq!( + scheduler.process_input(direct_input(1, 7)), + ProcessOutcome::DirectEnqueued + ); + // Inspect reflects executed app state, not the direct-input queue. + let state = scheduler + .inspect_state(STATE_INSPECT_QUERY) + .expect("inspect state"); + assert_eq!(state, b"events:0"); + + let batch = Batch { + nonce: 0, + frames: vec![Frame { + user_ops: vec![], + safe_block: 1, + fee_price: 0, + }], + }; + assert_eq!( + scheduler.process_input(batch_input(2, batch)), + ProcessOutcome::BatchExecuted + ); + + let state = scheduler + .inspect_state(STATE_INSPECT_QUERY) + .expect("inspect state after drain"); + assert_eq!(state, b"events:1"); + } + + #[test] + fn inspect_rejects_unsupported_query() { + let scheduler = Scheduler::new( + RecordingApp::default(), + SchedulerConfig { + sequencer_address: SEQUENCER, + max_wait_blocks: 100, + }, + ); + + assert_eq!( + scheduler.inspect_state(b"balances"), + Err(InspectError::UnsupportedQuery) + ); + } + #[test] fn wrong_batch_nonce_is_rejected_without_consuming_nonce() { let mut scheduler = Scheduler::new( diff --git a/examples/canonical-app/src/scheduler/mod.rs b/examples/canonical-app/src/scheduler/mod.rs index 8cbc12d..dd2c59b 100644 --- a/examples/canonical-app/src/scheduler/mod.rs +++ b/examples/canonical-app/src/scheduler/mod.rs @@ -3,7 +3,9 @@ mod core; -pub use core::{DEVNET_SEQUENCER_ADDRESS, SEPOLIA_SEQUENCER_ADDRESS, SchedulerConfig}; +pub use core::{ + DEVNET_SEQUENCER_ADDRESS, SEPOLIA_SEQUENCER_ADDRESS, STATE_INSPECT_QUERY, SchedulerConfig, +}; use sequencer_core::application::AppOutput; use sequencer_core::application::Application; @@ -46,9 +48,12 @@ pub fn run_scheduler_forever( }); } } - Ok(RollupRequest::Inspect { .. }) => { + Ok(RollupRequest::Inspect { payload }) => { + let report = scheduler + .inspect_state(&payload) + .unwrap_or_else(|err| panic!("scheduler inspect failed: {err:?}")); rollup - .emit_report(b"scheduler inspect endpoint not implemented") + .emit_report(&report) .unwrap_or_else(|err| panic!("scheduler failed to emit inspect report: {err}")); } Err(err) => panic!("scheduler failed while reading next input: {err}"), @@ -157,6 +162,42 @@ mod tests { } } + #[test] + fn run_scheduler_emits_exported_state_for_state_inspect() { + let inspect = RollupRequest::Inspect { + payload: STATE_INSPECT_QUERY.to_vec(), + }; + let terminal_err = Err(RollupError::CmtCallFailed { + operation: "next_input", + code: -22, + }); + let (rollup, reports) = MockRollup::with_inputs(vec![Ok(inspect), terminal_err]); + let expected = WalletApp::new(WalletConfig::default()) + .export_state() + .expect("export state") + .into_bytes(); + + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + run_scheduler_forever( + rollup, + WalletApp::new(WalletConfig::default()), + SchedulerConfig::default(), + ) + })); + + assert!( + result.is_err(), + "scheduler loop should panic on rollup error" + ); + let reports = reports.lock().expect("poisoned reports mutex"); + assert!( + reports + .iter() + .any(|report| report.as_slice() == expected.as_slice()), + "missing state inspect report, got: {reports:?}" + ); + } + #[test] fn run_scheduler_emits_report_for_invalid_batch_before_rollup_error() { let sequencer = SchedulerConfig::default().sequencer_address; diff --git a/justfile b/justfile index 61dfda5..13c2756 100644 --- a/justfile +++ b/justfile @@ -12,6 +12,26 @@ check-all-targets: test: cargo test --workspace +test-watchdog: + lua watchdog/tests/run.lua + +test-watchdog-e2e: + lua watchdog/tests/e2e.lua + +# POST sample alarms to WATCHDOG_WEBHOOK_URL (staging smoke). +test-watchdog-webhook-drill: watchdog-lua-deps + @test -n "${WATCHDOG_WEBHOOK_URL:-}" || { echo "set WATCHDOG_WEBHOOK_URL"; exit 1; } + WATCHDOG_LUA_DEPS={{justfile_directory()}}/.deps/lua lua watchdog/tests/drill_webhook.lua + WATCHDOG_LUA_DEPS={{justfile_directory()}}/.deps/lua lua watchdog/tests/drill_divergence.lua + +# Build lua-cjson into .deps/lua (for compare harness / drills without system packages). +watchdog-lua-deps: + @bash scripts/watchdog-lua-deps.sh + +test-watchdog-compare-harness: setup watchdog-lua-deps ensure-machine-image + cargo build -p sequencer --bin sequencer-devnet -p rollups-e2e + RUN_WATCHDOG_E2E=1 cargo run -p rollups-e2e -- watchdog_genesis_compare_test --exact --nocapture + # Run sequencer tests sequentially so partition static config (init) is not shared across parallel tests. test-sequencer: cargo test -p sequencer --lib -- --test-threads=1 diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..4f22047 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.91.1" +components = ["rustfmt", "clippy"] diff --git a/scripts/watchdog-lua-deps.sh b/scripts/watchdog-lua-deps.sh new file mode 100755 index 0000000..c7af0f5 --- /dev/null +++ b/scripts/watchdog-lua-deps.sh @@ -0,0 +1,86 @@ +#!/usr/bin/env bash +# Provide lua-cjson for watchdog tests: use system package or compile with gcc (no make). +set -euo pipefail + +root="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +out_dir="${root}/.deps/lua" +out_so="${out_dir}/cjson.so" + +mkdir -p "${out_dir}" + +cjson_loadable() { + # Prefer in-process cpath (works across Lua 5.x); LUA_CPATH_* env names vary by version. + lua -e "package.cpath='${out_dir}/?.so;'..package.cpath; require('cjson')" >/dev/null 2>&1 +} + +if [[ -f "${out_so}" ]] && cjson_loadable; then + exit 0 +fi + +# Prefer distro-packaged module (Debian/Ubuntu, etc.). +for candidate in \ + /usr/lib/x86_64-linux-gnu/lua/5.4/cjson.so \ + /usr/lib/x86_64-linux-gnu/lua/5.3/cjson.so \ + /usr/lib/lua/5.4/cjson.so \ + /usr/lib/lua/5.3/cjson.so; do + if [[ -f "${candidate}" ]]; then + cp "${candidate}" "${out_so}" + exit 0 + fi +done + +if cjson_loadable; then + exit 0 +fi + +lua_inc="" +for dir in /usr/include/lua5.4 /usr/include/lua5.3 /usr/include/lua; do + if [[ -f "${dir}/lua.h" ]]; then + lua_inc="${dir}" + break + fi +done + +if [[ -z "${lua_inc}" ]]; then + echo "watchdog-lua-deps: install lua-cjson (e.g. apt install lua-cjson) or Lua headers (lua5.4-dev)" >&2 + exit 1 +fi + +if ! command -v gcc >/dev/null 2>&1; then + echo "watchdog-lua-deps: install gcc or the lua-cjson system package" >&2 + exit 1 +fi + +tmp="$(mktemp -d)" +trap 'rm -rf "${tmp}"' EXIT + +src="${tmp}/lua-cjson" +if command -v curl >/dev/null 2>&1; then + curl -fsSL "https://github.com/openresty/lua-cjson/archive/refs/heads/master.tar.gz" \ + | tar -xz -C "${tmp}" +elif command -v wget >/dev/null 2>&1; then + wget -qO- "https://github.com/openresty/lua-cjson/archive/refs/heads/master.tar.gz" \ + | tar -xz -C "${tmp}" +else + echo "watchdog-lua-deps: need curl or wget to fetch lua-cjson sources" >&2 + exit 1 +fi + +shopt -s nullglob +dirs=("${tmp}"/lua-cjson-*) +if [[ ${#dirs[@]} -ne 1 ]]; then + echo "watchdog-lua-deps: unexpected lua-cjson extract layout" >&2 + exit 1 +fi +src="${dirs[0]}" + +cflags=(-O3 -Wall -pedantic -DNDEBUG -I"${lua_inc}" -fPIC) +gcc "${cflags[@]}" -c -o "${tmp}/lua_cjson.o" "${src}/lua_cjson.c" +gcc "${cflags[@]}" -c -o "${tmp}/strbuf.o" "${src}/strbuf.c" +gcc "${cflags[@]}" -c -o "${tmp}/fpconv.o" "${src}/fpconv.c" +gcc -shared -o "${out_so}" "${tmp}/lua_cjson.o" "${tmp}/strbuf.o" "${tmp}/fpconv.o" + +if ! cjson_loadable; then + echo "watchdog-lua-deps: built cjson.so but lua cannot load it (Lua version mismatch?)" >&2 + exit 1 +fi diff --git a/sequencer-core/src/application/mod.rs b/sequencer-core/src/application/mod.rs index 671cfc0..f941c30 100644 --- a/sequencer-core/src/application/mod.rs +++ b/sequencer-core/src/application/mod.rs @@ -131,4 +131,10 @@ pub trait Application: Send { fn executed_input_count(&self) -> u64 { 0 } + + fn export_state(&self) -> Result { + Err(AppError::Internal { + reason: "application state export is not implemented".to_string(), + }) + } } diff --git a/sequencer/src/egress/api/get_state.rs b/sequencer/src/egress/api/get_state.rs new file mode 100644 index 0000000..aa0b422 --- /dev/null +++ b/sequencer/src/egress/api/get_state.rs @@ -0,0 +1,111 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +use axum::Json; +use axum::extract::State; +use serde::Serialize; + +use crate::egress::api::GetStateState; +use crate::egress::app_state::StateSnapshotError; +use crate::http::ApiError; + +#[derive(Debug, Serialize)] +pub(crate) struct GetStateResponse { + safe_block: u64, + state: String, +} + +pub(crate) async fn get_state( + State(state): State, +) -> Result, ApiError> { + state.reject_if_shutting_down()?; + let snapshotter = state.snapshotter.clone(); + let snapshot = tokio::task::spawn_blocking(move || snapshotter.snapshot()) + .await + .map_err(|err| ApiError::internal_error(format!("state snapshot task failed: {err}")))? + .map_err(map_snapshot_error)?; + + Ok(Json(GetStateResponse { + safe_block: snapshot.safe_block, + state: snapshot.state, + })) +} + +fn map_snapshot_error(err: StateSnapshotError) -> ApiError { + match err { + StateSnapshotError::SafeHeadUnavailable => ApiError::unavailable(err.to_string()), + StateSnapshotError::Storage { .. } + | StateSnapshotError::OpenStorage { .. } + | StateSnapshotError::Application { .. } => ApiError::internal_error(err.to_string()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + + use crate::egress::app_state::{StateSnapshot, StateSnapshotProvider}; + use crate::runtime::shutdown::ShutdownSignal; + + struct FixedSnapshotter; + + impl StateSnapshotProvider for FixedSnapshotter { + fn snapshot(&self) -> Result { + Ok(StateSnapshot { + safe_block: 42, + state: "{\"ok\":true}".to_string(), + }) + } + } + + struct UnavailableSnapshotter; + + impl StateSnapshotProvider for UnavailableSnapshotter { + fn snapshot(&self) -> Result { + Err(StateSnapshotError::SafeHeadUnavailable) + } + } + + #[tokio::test] + async fn get_state_returns_safe_block_and_raw_state() { + let state = GetStateState { + shutdown: ShutdownSignal::default(), + snapshotter: Arc::new(FixedSnapshotter), + }; + + let response = get_state(State(state)).await.expect("get state"); + + assert_eq!(response.safe_block, 42); + assert_eq!(response.state, "{\"ok\":true}"); + } + + #[tokio::test] + async fn get_state_rejects_shutdown() { + let shutdown = ShutdownSignal::default(); + shutdown.request_shutdown(); + let state = GetStateState { + shutdown, + snapshotter: Arc::new(FixedSnapshotter), + }; + + let err = get_state(State(state)).await.expect_err("shutdown"); + + assert_eq!(err.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE); + } + + #[tokio::test] + async fn get_state_reports_unavailable_when_safe_head_missing() { + let state = GetStateState { + shutdown: ShutdownSignal::default(), + snapshotter: Arc::new(UnavailableSnapshotter), + }; + + let err = get_state(State(state)) + .await + .expect_err("missing safe head"); + + assert_eq!(err.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE); + } +} diff --git a/sequencer/src/egress/api/mod.rs b/sequencer/src/egress/api/mod.rs index 5d112ea..33f9614 100644 --- a/sequencer/src/egress/api/mod.rs +++ b/sequencer/src/egress/api/mod.rs @@ -4,6 +4,7 @@ //! Egress HTTP API routes: WebSocket subscribe + k8s-style health probes. //! Additional read endpoints will land here. +mod get_state; mod health; mod state; mod subscribe; @@ -14,23 +15,28 @@ use axum::Router; use axum::routing::get; pub(crate) use health::HealthState; -pub(crate) use state::SubscribeState; +pub(crate) use state::{GetStateState, SubscribeState}; /// Build the egress router. Each subrouter has its own state; the merge is /// transparent to axum's routing. pub(crate) fn router( subscribe_state: Arc, + get_state_state: GetStateState, health_state: Arc, ) -> Router { let subscribe_router = Router::new() .route("/ws/subscribe", get(subscribe::subscribe_l2_txs)) .with_state(subscribe_state); + let state_router = Router::new() + .route("/get_state", get(get_state::get_state)) + .with_state(get_state_state); + let health_router = Router::new() .route("/livez", get(health::livez)) .route("/readyz", get(health::readyz)) .route("/healthz", get(health::healthz)) .with_state(health_state); - subscribe_router.merge(health_router) + subscribe_router.merge(state_router).merge(health_router) } diff --git a/sequencer/src/egress/api/state.rs b/sequencer/src/egress/api/state.rs index de15396..66ab498 100644 --- a/sequencer/src/egress/api/state.rs +++ b/sequencer/src/egress/api/state.rs @@ -1,13 +1,13 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -//! Egress-side axum state — feeds the WS subscribe handler today; will grow as -//! more egress routes are added. +//! Egress-side axum state for WS subscribe, `GET /get_state`, and health probes. use std::sync::Arc; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use crate::egress::app_state::StateSnapshotProvider; use crate::egress::l2_tx_feed::L2TxFeed; use crate::http::ApiError; use crate::runtime::shutdown::ShutdownSignal; @@ -20,6 +20,22 @@ pub(crate) struct SubscribeState { pub tx_feed: L2TxFeed, } +#[derive(Clone)] +pub(crate) struct GetStateState { + pub shutdown: ShutdownSignal, + pub snapshotter: Arc, +} + +impl GetStateState { + pub(crate) fn reject_if_shutting_down(&self) -> Result<(), ApiError> { + if self.shutdown.is_shutdown_requested() { + Err(ApiError::unavailable("sequencer shutting down")) + } else { + Ok(()) + } + } +} + impl SubscribeState { pub(crate) fn new( shutdown: ShutdownSignal, diff --git a/sequencer/src/egress/app_state.rs b/sequencer/src/egress/app_state.rs new file mode 100644 index 0000000..16719ca --- /dev/null +++ b/sequencer/src/egress/app_state.rs @@ -0,0 +1,292 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +//! Safe app-state reconstruction for read-only egress endpoints. + +use alloy_primitives::Address; +use thiserror::Error; + +use crate::storage::Storage; +use sequencer_core::application::{AppError, Application}; +use sequencer_core::l2_tx::SequencedL2Tx; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateSnapshot { + pub safe_block: u64, + pub state: String, +} + +#[derive(Debug, Error)] +pub enum StateSnapshotError { + #[error("safe L1 head has not been observed yet")] + SafeHeadUnavailable, + #[error("storage error: {source}")] + Storage { + #[from] + source: rusqlite::Error, + }, + #[error("storage open error: {source}")] + OpenStorage { + #[from] + source: crate::storage::StorageOpenError, + }, + #[error("application error: {source}")] + Application { + #[from] + source: AppError, + }, +} + +#[derive(Clone)] +pub struct StateSnapshotter { + db_path: String, + genesis_app: A, + batch_submitter_address: Address, +} + +pub trait StateSnapshotProvider: Send + Sync { + fn snapshot(&self) -> Result; +} + +impl StateSnapshotter +where + A: Application + Clone, +{ + pub fn new(db_path: String, genesis_app: A, batch_submitter_address: Address) -> Self { + Self { + db_path, + genesis_app, + batch_submitter_address, + } + } + + fn build_snapshot(&self) -> Result { + let mut storage = Storage::open_read_only(self.db_path.as_str())?; + let safe_block = storage + .current_safe_block()? + .ok_or(StateSnapshotError::SafeHeadUnavailable)?; + let txs = storage.safe_accepted_l2_txs(self.batch_submitter_address)?; + + let mut app = self.genesis_app.clone(); + replay_txs(&mut app, txs)?; + let state = app.export_state()?; + + Ok(StateSnapshot { safe_block, state }) + } +} + +impl StateSnapshotProvider for StateSnapshotter +where + A: Application + Clone + Sync, +{ + fn snapshot(&self) -> Result { + self.build_snapshot() + } +} + +fn replay_txs(app: &mut A, txs: Vec) -> Result<(), AppError> +where + A: Application, +{ + for tx in txs { + match tx { + SequencedL2Tx::UserOp(user_op) => { + app.execute_valid_user_op(&user_op)?; + } + SequencedL2Tx::Direct(input) => { + app.execute_direct_input(&input)?; + } + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::time::SystemTime; + + use crate::ingress::inclusion_lane::PendingUserOp; + use crate::storage::test_helpers::{default_protocol_timing, temp_db}; + use crate::storage::{SafeInputRange, Storage, StoredSafeInput}; + use alloy_primitives::Signature; + use sequencer_core::application::{AppOutputs, InvalidReason}; + use sequencer_core::batch::Batch; + use sequencer_core::l2_tx::{DirectInput, ValidUserOp}; + use sequencer_core::user_op::{SignedUserOp, UserOp}; + use tokio::sync::oneshot; + + #[derive(Clone, Default)] + struct RecordingApp { + events: Vec, + } + + impl Application for RecordingApp { + const MAX_METHOD_PAYLOAD_BYTES: usize = 1024; + + fn current_user_nonce(&self, _sender: Address) -> u32 { + 0 + } + + fn current_user_balance(&self, _sender: Address) -> alloy_primitives::U256 { + alloy_primitives::U256::ZERO + } + + fn validate_user_op( + &self, + _sender: Address, + _user_op: &UserOp, + _current_fee: u16, + ) -> Result<(), InvalidReason> { + Ok(()) + } + + fn execute_valid_user_op(&mut self, user_op: &ValidUserOp) -> Result { + self.events.push(format!( + "user:{}:{}:{}", + user_op.sender, + user_op.fee, + alloy_primitives::hex::encode(&user_op.data) + )); + Ok(Vec::new()) + } + + fn execute_direct_input(&mut self, input: &DirectInput) -> Result { + self.events.push(format!( + "direct:{}:{}:{}", + input.sender, + input.block_number, + alloy_primitives::hex::encode(&input.payload) + )); + Ok(Vec::new()) + } + + fn export_state(&self) -> Result { + Ok(self.events.join("|")) + } + } + + fn empty_batch_payload(nonce: u64) -> Vec { + ssz::Encode::as_ssz_bytes(&Batch { + nonce, + frames: Vec::new(), + }) + } + + #[test] + fn snapshot_replays_only_scheduler_accepted_batch_prefix() { + let db = temp_db("state-snapshot-accepted-prefix"); + let mut storage = Storage::open(db.path.as_str()).expect("open storage"); + let batch_submitter = Address::repeat_byte(0xfe); + let direct_sender = Address::repeat_byte(0x11); + + storage + .append_safe_inputs( + 10, + &[ + StoredSafeInput { + sender: direct_sender, + payload: vec![0xaa], + block_number: 10, + }, + StoredSafeInput { + sender: batch_submitter, + payload: empty_batch_payload(0), + block_number: 10, + }, + ], + batch_submitter, + &default_protocol_timing(), + ) + .expect("append safe inputs"); + let mut head = storage + .initialize_open_state(10, SafeInputRange::new(0, 2)) + .expect("initialize open state"); + storage + .close_frame_and_batch(&mut head, 10) + .expect("close accepted batch"); + + let snapshotter = + StateSnapshotter::new(db.path.clone(), RecordingApp::default(), batch_submitter); + let snapshot = snapshotter.snapshot().expect("snapshot"); + + assert_eq!(snapshot.safe_block, 10); + assert_eq!(snapshot.state, format!("direct:{direct_sender}:10:aa")); + } + + #[test] + fn snapshot_excludes_open_tip_user_op() { + let db = temp_db("state-snapshot-excludes-tip"); + let mut storage = Storage::open(db.path.as_str()).expect("open storage"); + let batch_submitter = Address::repeat_byte(0xfe); + let sender = Address::repeat_byte(0x22); + + storage + .append_safe_inputs(10, &[], batch_submitter, &default_protocol_timing()) + .expect("append safe head"); + let mut head = storage + .initialize_open_state(10, SafeInputRange::new(0, 0)) + .expect("initialize open state"); + let (respond_to, _recv) = oneshot::channel(); + storage + .append_user_ops_chunk( + &mut head, + &[PendingUserOp { + signed: SignedUserOp { + sender, + signature: Signature::test_signature(), + user_op: UserOp { + nonce: 0, + max_fee: u16::MAX, + data: vec![0x01].into(), + }, + }, + respond_to, + received_at: SystemTime::now(), + }], + ) + .expect("append tip user op"); + + let snapshotter = + StateSnapshotter::new(db.path.clone(), RecordingApp::default(), batch_submitter); + let snapshot = snapshotter.snapshot().expect("snapshot"); + + assert_eq!(snapshot.safe_block, 10); + assert_eq!(snapshot.state, ""); + } + + #[test] + fn snapshot_excludes_unaccepted_soft_batch() { + let db = temp_db("state-snapshot-excludes-soft"); + let mut storage = Storage::open(db.path.as_str()).expect("open storage"); + let batch_submitter = Address::repeat_byte(0xfe); + let direct_sender = Address::repeat_byte(0x11); + + storage + .append_safe_inputs( + 10, + &[StoredSafeInput { + sender: direct_sender, + payload: vec![0xaa], + block_number: 10, + }], + batch_submitter, + &default_protocol_timing(), + ) + .expect("append safe input"); + let mut head = storage + .initialize_open_state(10, SafeInputRange::new(0, 1)) + .expect("initialize open state"); + storage + .close_frame_and_batch(&mut head, 10) + .expect("close unaccepted batch"); + + let snapshotter = + StateSnapshotter::new(db.path.clone(), RecordingApp::default(), batch_submitter); + let snapshot = snapshotter.snapshot().expect("snapshot"); + + assert_eq!(snapshot.safe_block, 10); + assert_eq!(snapshot.state, ""); + } +} diff --git a/sequencer/src/egress/mod.rs b/sequencer/src/egress/mod.rs index ac7b75a..3bcfb8e 100644 --- a/sequencer/src/egress/mod.rs +++ b/sequencer/src/egress/mod.rs @@ -6,4 +6,5 @@ //! split puts these on a separate port from ingress. pub mod api; +pub mod app_state; pub mod l2_tx_feed; diff --git a/sequencer/src/http.rs b/sequencer/src/http.rs index 3ee56f1..fc1bbe6 100644 --- a/sequencer/src/http.rs +++ b/sequencer/src/http.rs @@ -24,6 +24,7 @@ use tokio::sync::mpsc; use tower_http::trace::TraceLayer; use crate::egress::api::SubscribeState; +use crate::egress::app_state::StateSnapshotProvider; use crate::egress::l2_tx_feed::L2TxFeed; use crate::ingress::api::SubmitState; use crate::ingress::inclusion_lane::{PendingUserOp, SequencerError}; @@ -174,6 +175,7 @@ pub async fn start( max_user_op_data_bytes: usize, shutdown: ShutdownSignal, tx_feed: L2TxFeed, + state_snapshotter: Arc, config: ApiConfig, ) -> io::Result { let listener = tokio::net::TcpListener::bind(http_addr).await?; @@ -184,6 +186,7 @@ pub async fn start( max_user_op_data_bytes, shutdown, tx_feed, + state_snapshotter, config, )) } @@ -196,6 +199,7 @@ pub fn start_on_listener( max_user_op_data_bytes: usize, shutdown: ShutdownSignal, tx_feed: L2TxFeed, + state_snapshotter: Arc, config: ApiConfig, ) -> ApiServerTask { let health_state = Arc::new(crate::egress::api::HealthState { @@ -214,9 +218,17 @@ pub fn start_on_listener( config.ws_max_subscribers, config.ws_max_catchup_events, )); + let get_state_state = crate::egress::api::GetStateState { + shutdown: shutdown.clone(), + snapshotter: state_snapshotter, + }; let app: Router = crate::ingress::api::router(submit_state) - .merge(crate::egress::api::router(subscribe_state, health_state)) + .merge(crate::egress::api::router( + subscribe_state, + get_state_state, + health_state, + )) // Enforces a raw request-body cap before JSON deserialization, including whitespace. .layer(DefaultBodyLimit::max(config.max_body_bytes)) .layer(TraceLayer::new_for_http()); diff --git a/sequencer/src/runtime/mod.rs b/sequencer/src/runtime/mod.rs index 3412e85..11386f1 100644 --- a/sequencer/src/runtime/mod.rs +++ b/sequencer/src/runtime/mod.rs @@ -37,7 +37,7 @@ const INPUT_READER_POLL_INTERVAL: Duration = Duration::from_secs(2); pub async fn run(app: A, config: RunConfig) -> Result<(), RunError> where - A: Application + 'static, + A: Application + Clone + Sync + 'static, { // ── Bootstrap ──────────────────────────────────────────── std::fs::create_dir_all(&config.data_dir)?; diff --git a/sequencer/src/runtime/workers.rs b/sequencer/src/runtime/workers.rs index a1b0b1a..256f57d 100644 --- a/sequencer/src/runtime/workers.rs +++ b/sequencer/src/runtime/workers.rs @@ -83,7 +83,7 @@ pub(crate) struct Workers { impl Workers { /// Build the worker configs, spawn each worker, return the owning struct. /// Logs `listening` once the HTTP server is bound. - pub(crate) async fn spawn( + pub(crate) async fn spawn( cfg: WorkersConfig, ) -> Result { let WorkersConfig { @@ -102,6 +102,12 @@ impl Workers { let shutdown = ShutdownSignal::default(); + let state_snapshotter = Arc::new(crate::egress::app_state::StateSnapshotter::new( + db_path.clone(), + app.clone(), + l1_config.batch_submitter_address, + )); + // Inclusion lane: takes the app, returns the tx-sender the HTTP // ingress route will publish to. let storage = crate::storage::Storage::open(&db_path)?; @@ -154,6 +160,7 @@ impl Workers { A::MAX_METHOD_PAYLOAD_BYTES, shutdown.clone(), tx_feed, + state_snapshotter, ApiConfig::default(), ) .await?; diff --git a/sequencer/src/storage/egress.rs b/sequencer/src/storage/egress.rs index dbf98e7..907de62 100644 --- a/sequencer/src/storage/egress.rs +++ b/sequencer/src/storage/egress.rs @@ -16,6 +16,61 @@ use super::queries::decode_l2_tx_row; use sequencer_core::l2_tx::SequencedL2Tx; impl Storage { + /// Load L2 transactions for the scheduler-accepted safe batch prefix. + /// + /// This is the replay source for safe app-state export. Unlike + /// `valid_sequenced_l2_txs`, it excludes the soft-confirmed Tip and any + /// valid closed batches that have not yet been accepted by the L1 scheduler. + pub fn safe_accepted_l2_txs( + &mut self, + batch_submitter_address: Address, + ) -> Result> { + const SQL: &str = " + SELECT + CASE WHEN s.user_op_pos_in_frame IS NOT NULL THEN 0 ELSE 1 END AS kind, + CASE + WHEN s.user_op_pos_in_frame IS NOT NULL THEN u.sender + WHEN s.safe_input_index IS NOT NULL THEN d.sender + ELSE NULL + END AS sender, + CASE WHEN s.user_op_pos_in_frame IS NOT NULL THEN u.data ELSE NULL END AS data, + CASE WHEN s.user_op_pos_in_frame IS NOT NULL THEN f.fee ELSE NULL END AS fee, + CASE WHEN s.safe_input_index IS NOT NULL THEN d.payload ELSE NULL END AS payload, + CASE WHEN s.safe_input_index IS NOT NULL THEN d.block_number ELSE NULL END AS block_number + FROM safe_accepted_batches a + JOIN valid_closed_batches b + ON b.nonce = a.nonce + JOIN valid_sequenced_l2_txs s + ON s.batch_index = b.batch_index + AND NOT (s.safe_input_index IS NOT NULL + AND EXISTS (SELECT 1 FROM safe_inputs si + WHERE si.safe_input_index = s.safe_input_index + AND si.sender = ?1)) + LEFT JOIN user_ops u + ON u.batch_index = s.batch_index + AND u.frame_in_batch = s.frame_in_batch + AND u.pos_in_frame = s.user_op_pos_in_frame + LEFT JOIN frames f + ON f.batch_index = s.batch_index + AND f.frame_in_batch = s.frame_in_batch + LEFT JOIN safe_inputs d + ON d.safe_input_index = s.safe_input_index + ORDER BY a.nonce ASC, s.offset ASC + "; + let mut stmt = self.conn.prepare_cached(SQL)?; + let rows = stmt.query_map(params![batch_submitter_address.as_slice()], |row| { + Ok(decode_l2_tx_row( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + row.get(5)?, + )) + })?; + rows.collect::>>() + } + /// Load a page of ordered L2 transactions starting after the given offset. /// Returns `(db_offset, tx)` pairs. Callers should track `db_offset` of the /// last item as their cursor, not increment a counter. diff --git a/sequencer/tests/e2e_sequencer.rs b/sequencer/tests/e2e_sequencer.rs index 9b7bea7..56c624d 100644 --- a/sequencer/tests/e2e_sequencer.rs +++ b/sequencer/tests/e2e_sequencer.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use std::io::ErrorKind; +use std::sync::Arc; use std::time::Duration; use alloy_primitives::{Address, Signature, U256}; @@ -12,6 +13,7 @@ use app_core::application::{ use futures_util::StreamExt; use k256::ecdsa::SigningKey; use k256::ecdsa::signature::hazmat::PrehashSigner; +use sequencer::egress::app_state::StateSnapshotter; use sequencer::egress::l2_tx_feed::{L2TxFeed, L2TxFeedConfig}; use sequencer::http::{self, ApiConfig}; use sequencer::ingress::inclusion_lane::{ @@ -192,6 +194,49 @@ fn v1_regression_domain_fields_all_affect_recovery() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_state_returns_safe_only_projection_over_http() { + let db = temp_db("get-state-http"); + let domain = test_domain(); + let sender = Address::repeat_byte(0x11); + bootstrap_open_frame_with_deposits(db.path.as_str(), &[(sender, U256::from(10_u64))]); + + let Some(runtime) = start_api_only_server(db.path.as_str(), domain, 4 * 1024, 8).await else { + return; + }; + + let (status, body) = get_raw(runtime.addr, "/get_state").await; + shutdown_runtime(runtime).await; + + assert_eq!(status, 200, "get_state response: {body}"); + let parsed: serde_json::Value = serde_json::from_str(&body).expect("parse get_state JSON"); + assert_eq!(parsed["safe_block"], 1); + assert_eq!( + parsed["state"], "{\"balances\":{},\"nonces\":{}}", + "soft-confirmed tip deposits must not appear in safe-only export" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_state_is_unavailable_before_safe_head_is_observed() { + let db = temp_db("get-state-no-safe-head"); + let domain = test_domain(); + let _storage = Storage::open(db.path.as_str()).expect("open storage"); + + let Some(runtime) = start_api_only_server(db.path.as_str(), domain, 4 * 1024, 8).await else { + return; + }; + + let (status, body) = get_raw(runtime.addr, "/get_state").await; + shutdown_runtime(runtime).await; + + assert_eq!(status, 503, "get_state without safe head: {body}"); + assert!( + body.contains("safe L1 head has not been observed yet"), + "expected safe-head unavailable message, got: {body}" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn e2e_submit_tx_ack_and_broadcast() { let db = temp_db("full-e2e"); @@ -1118,6 +1163,11 @@ async fn start_full_server_with_max_body( MAX_METHOD_PAYLOAD_BYTES, shutdown.clone(), tx_feed, + Arc::new(StateSnapshotter::new( + db_path.to_string(), + WalletApp::new(WalletConfig::default()), + Address::from([0xff; 20]), + )), ApiConfig { max_body_bytes, ..ApiConfig::default() @@ -1168,6 +1218,11 @@ async fn start_api_only_server( MAX_METHOD_PAYLOAD_BYTES, shutdown.clone(), tx_feed, + Arc::new(StateSnapshotter::new( + db_path.to_string(), + WalletApp::new(WalletConfig::default()), + Address::from([0xff; 20]), + )), ApiConfig { max_body_bytes, ..ApiConfig::default() @@ -1388,6 +1443,26 @@ async fn post_raw_body_no_content_type(addr: std::net::SocketAddr, body: &str) - parse_http_response(response.as_slice()) } +async fn get_raw(addr: std::net::SocketAddr, path: &str) -> (u16, String) { + let host_port = addr.to_string(); + let mut stream = tokio::net::TcpStream::connect(host_port.as_str()) + .await + .expect("connect test http socket"); + let request = format!("GET {path} HTTP/1.1\r\nHost: {host_port}\r\nConnection: close\r\n\r\n"); + stream + .write_all(request.as_bytes()) + .await + .expect("write raw request"); + stream.flush().await.expect("flush raw request"); + + let mut response = Vec::new(); + stream + .read_to_end(&mut response) + .await + .expect("read raw response"); + parse_http_response(response.as_slice()) +} + async fn post_raw_json(addr: std::net::SocketAddr, body: &str) -> (u16, String) { let host_port = addr.to_string(); let mut stream = tokio::net::TcpStream::connect(host_port.as_str()) diff --git a/sequencer/tests/ws_broadcaster.rs b/sequencer/tests/ws_broadcaster.rs index 0aeaea3..8dbd180 100644 --- a/sequencer/tests/ws_broadcaster.rs +++ b/sequencer/tests/ws_broadcaster.rs @@ -2,12 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use std::io::ErrorKind; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use alloy_primitives::{Address, Signature}; use alloy_sol_types::Eip712Domain; -use app_core::application::MAX_METHOD_PAYLOAD_BYTES; +use app_core::application::{MAX_METHOD_PAYLOAD_BYTES, WalletApp, WalletConfig}; use futures_util::{SinkExt, StreamExt}; +use sequencer::egress::app_state::StateSnapshotter; use sequencer::egress::l2_tx_feed::{L2TxFeed, L2TxFeedConfig}; use sequencer::http::{self, ApiConfig, WS_CATCHUP_WINDOW_EXCEEDED_REASON}; use sequencer::ingress::inclusion_lane::{PendingUserOp, SequencerError}; @@ -453,6 +455,11 @@ async fn start_test_server_with_limits( MAX_METHOD_PAYLOAD_BYTES, shutdown.clone(), tx_feed, + Arc::new(StateSnapshotter::new( + db_path.to_string(), + WalletApp::new(WalletConfig::default()), + Address::from([0xff; 20]), + )), ApiConfig { ws_max_subscribers, ws_max_catchup_events, diff --git a/tests/e2e/Cargo.toml b/tests/e2e/Cargo.toml index b227077..98a9308 100644 --- a/tests/e2e/Cargo.toml +++ b/tests/e2e/Cargo.toml @@ -19,4 +19,6 @@ alloy-sol-types = "1.4.1" futures = "0.3" libtest-mimic = "0.6.1" ssz = { package = "ethereum_ssz", version = "0.10" } -tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "time", "net"] } +tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "time", "net", "process", "io-util"] } +serde_json = "1.0" +tempfile = "3.10" diff --git a/tests/e2e/src/lib.rs b/tests/e2e/src/lib.rs index 879df69..5b72017 100644 --- a/tests/e2e/src/lib.rs +++ b/tests/e2e/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) pub mod test_cases; +mod watchdog_compare; use std::future::Future; use std::pin::Pin; diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index 7b3c903..6b16c9f 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -3,7 +3,9 @@ use libtest_mimic::{Arguments, Trial}; use rollups_e2e::run_trial; -use rollups_harness::{ManagedSequencer, default_devnet_sequencer_config}; +use rollups_harness::{ + ManagedSequencer, default_devnet_sequencer_config, devnet_sequencer_config_no_faketime, +}; fn main() { let mut args = Arguments::from_args(); @@ -14,10 +16,13 @@ fn main() { .map(|(name, scenario)| { Trial::test(name, move || { let log_prefix = format!("rollups-e2e-{name}"); + let spawn_config = if name == "watchdog_genesis_compare_test" { + devnet_sequencer_config_no_faketime(log_prefix) + } else { + default_devnet_sequencer_config(log_prefix) + }; run_trial(name, || async move { - let mut runtime = - ManagedSequencer::spawn(default_devnet_sequencer_config(log_prefix)) - .await?; + let mut runtime = ManagedSequencer::spawn(spawn_config).await?; let scenario_result = scenario(&mut runtime).await; // Post-test schema invariants: assert the DB's structural // invariants only if the scenario succeeded — otherwise diff --git a/tests/e2e/src/test_cases.rs b/tests/e2e/src/test_cases.rs index af3a41b..549a5bb 100644 --- a/tests/e2e/src/test_cases.rs +++ b/tests/e2e/src/test_cases.rs @@ -305,6 +305,11 @@ pub fn test_cases() -> Vec<(&'static str, ScenarioFn)> { ) }, ), + ("watchdog_genesis_compare_test", |runtime| { + Box::pin(crate::watchdog_compare::run_watchdog_genesis_compare_test( + runtime, + )) + }), ] } diff --git a/tests/e2e/src/watchdog_compare.rs b/tests/e2e/src/watchdog_compare.rs new file mode 100644 index 0000000..a7e63ef --- /dev/null +++ b/tests/e2e/src/watchdog_compare.rs @@ -0,0 +1,194 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +//! Watchdog compare harness: Anvil + devnet sequencer + live CM inspect. + +use std::path::Path; +use std::process::Stdio; +use std::time::Duration; + +use rollups_harness::ManagedSequencer; +use rollups_harness::paths; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::process::Command; + +use crate::ScenarioResult; + +const MACHINE_IMAGE: &str = "examples/canonical-app/out/canonical-machine-image"; +const EMPTY_WALLET_STATE: &str = "{\"balances\":{},\"nonces\":{}}"; +const GENESIS_SAFE_BLOCK: &str = "0"; + +pub async fn run_watchdog_genesis_compare_test( + runtime: &mut ManagedSequencer, +) -> ScenarioResult<()> { + if std::env::var("RUN_WATCHDOG_E2E").ok().as_deref() != Some("1") { + eprintln!("skipping watchdog_genesis_compare_test: set RUN_WATCHDOG_E2E=1 to enable"); + return Ok(()); + } + + let workspace = paths::workspace_root(); + let machine_image = workspace.join(MACHINE_IMAGE); + if !machine_image.is_dir() { + return Err(format!( + "canonical machine image missing at {}; run: just canonical-build-machine-image", + machine_image.display() + ) + .into()); + } + + eprintln!("[watchdog-harness] step 1/5: wait for sequencer GET /get_state (safe head)"); + let get_state_url = format!("{}/get_state", runtime.endpoint()); + let (_status, body) = + wait_for_get_state(get_state_url.as_str(), Duration::from_secs(30)).await?; + let parsed: serde_json::Value = serde_json::from_str(&body) + .map_err(|err| format!("invalid /get_state JSON: {err}; body={body}"))?; + let sequencer_state = parsed["state"] + .as_str() + .ok_or("get_state response missing state field")?; + let safe_block = parsed["safe_block"] + .as_u64() + .ok_or("get_state response missing safe_block field")?; + eprintln!("[watchdog-harness] sequencer safe_block={safe_block} state={sequencer_state}"); + if sequencer_state != EMPTY_WALLET_STATE { + return Err(format!( + "expected genesis-empty wallet state {EMPTY_WALLET_STATE}, got {sequencer_state}" + ) + .into()); + } + + eprintln!("[watchdog-harness] step 2/5: prove CM inspect JSON on genesis image"); + let inspect_state = + prove_cm_inspect_genesis(workspace.as_path(), machine_image.as_path()).await?; + if inspect_state != EMPTY_WALLET_STATE { + return Err( + format!("CM inspect expected {EMPTY_WALLET_STATE}, got {inspect_state}").into(), + ); + } + + eprintln!("[watchdog-harness] step 3/5: prepare watchdog checkpoint dir"); + let checkpoint_dir = tempfile::tempdir() + .map_err(|err| format!("temp checkpoint dir: {err}"))? + .keep(); + + eprintln!("[watchdog-harness] step 4/5: run Lua compare-mode pass"); + let lua_deps = workspace.join(".deps/lua"); + let compare_status = Command::new("lua") + .current_dir(workspace.as_path()) + .arg("watchdog/tests/run_compare_once.lua") + .env("WATCHDOG_LUA_DEPS", lua_deps.as_os_str()) + .env("WATCHDOG_MODE", "compare") + .env("WATCHDOG_SEQUENCER_URL", runtime.endpoint()) + .env("WATCHDOG_L1_RPC_URL", runtime.l1_endpoint()) + .env( + "WATCHDOG_INPUTBOX_ADDRESS", + runtime.input_box_address().to_string(), + ) + .env("WATCHDOG_APP_ADDRESS", runtime.app_address().to_string()) + .env("WATCHDOG_CHECKPOINT_DIR", checkpoint_dir.as_os_str()) + .env("WATCHDOG_CM_SNAPSHOT_DIR", machine_image.as_os_str()) + .env("WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK", GENESIS_SAFE_BLOCK) + .env("WATCHDOG_CM_EXECUTABLE", "cartesi-machine") + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .status() + .await + .map_err(|err| format!("failed to run watchdog compare lua: {err}"))?; + if !compare_status.success() { + return Err(format!("watchdog compare lua exited with {}", compare_status).into()); + } + + eprintln!("[watchdog-harness] step 5/5: compare pass completed successfully"); + Ok(()) +} + +async fn prove_cm_inspect_genesis( + workspace: &Path, + machine_image: &Path, +) -> ScenarioResult { + let work_dir = tempfile::tempdir().map_err(|err| format!("temp cm work dir: {err}"))?; + let query_path = work_dir.path().join("inspect-query.bin"); + let report_path = work_dir.path().join("inspect-report-0.bin"); + std::fs::write(query_path.as_path(), b"state") + .map_err(|err| format!("write inspect query: {err}"))?; + + let status = Command::new("cartesi-machine") + .current_dir(workspace) + .arg("--no-rollback") + .arg(format!("--load={},sharing:none", machine_image.display())) + .arg(format!( + "--cmio-inspect-state=query:{},report:{}", + query_path.display(), + report_path.display() + )) + .arg("--quiet") + .status() + .await + .map_err(|err| format!("cartesi-machine inspect failed to start: {err}"))?; + if !status.success() { + return Err(format!("cartesi-machine inspect exited with {status}").into()); + } + + let report = std::fs::read_to_string(report_path.as_path()) + .map_err(|err| format!("read inspect report: {err}"))?; + if report.contains("inspect endpoint not implemented") { + return Err( + "CM dapp is stale (inspect not implemented); rebuild with just canonical-build-machine-image" + .into(), + ); + } + Ok(report) +} + +async fn http_get(url: &str) -> std::io::Result<(u16, String)> { + let remainder = url.strip_prefix("http://").ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "only http:// supported in harness", + ) + })?; + let (host_port, path) = match remainder.split_once('/') { + Some((host_port, path)) => (host_port.to_string(), format!("/{path}")), + None => (remainder.to_string(), "/".to_string()), + }; + + let mut stream = TcpStream::connect(host_port.as_str()).await?; + let request = format!("GET {path} HTTP/1.1\r\nHost: {host_port}\r\nConnection: close\r\n\r\n"); + stream.write_all(request.as_bytes()).await?; + stream.flush().await?; + + let mut raw = Vec::new(); + stream.read_to_end(&mut raw).await?; + let text = String::from_utf8(raw).map_err(std::io::Error::other)?; + let (headers, body) = text + .split_once("\r\n\r\n") + .ok_or_else(|| std::io::Error::other("missing HTTP body"))?; + let status = headers + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .and_then(|code| code.parse().ok()) + .unwrap_or(500); + Ok((status, body.to_string())) +} + +async fn wait_for_get_state(url: &str, deadline: Duration) -> ScenarioResult<(u16, String)> { + let started = std::time::Instant::now(); + let mut last = String::new(); + while started.elapsed() < deadline { + match http_get(url).await { + Ok((200, body)) => return Ok((200, body)), + Ok((503, body)) => { + last = body; + } + Ok((status, body)) => { + return Err(format!("GET /get_state returned HTTP {status}: {body}").into()); + } + Err(err) => { + last = err.to_string(); + } + } + tokio::time::sleep(Duration::from_millis(200)).await; + } + Err(format!("timed out waiting for GET /get_state 200; last response: {last}").into()) +} diff --git a/tests/harness/src/lib.rs b/tests/harness/src/lib.rs index a528461..d7dcbc0 100644 --- a/tests/harness/src/lib.rs +++ b/tests/harness/src/lib.rs @@ -18,6 +18,7 @@ pub use rollups::{DEVNET_CHAIN_ID, DevnetRollupsStack}; pub use sequencer::{ BatchCounts, DEFAULT_DEVNET_SEQUENCER_BIN, DEFAULT_TEST_LOGS_DIR, ManagedSequencer, ManagedSequencerConfig, RespawnAttemptOutcome, RespawnPolicy, default_devnet_sequencer_config, + devnet_sequencer_config_no_faketime, }; pub use wallet::{ TestSigner, WalletL1Client, WalletL2Client, address_from_signing_key, sign_user_op_hex, diff --git a/tests/harness/src/paths.rs b/tests/harness/src/paths.rs index 5098069..e0de961 100644 --- a/tests/harness/src/paths.rs +++ b/tests/harness/src/paths.rs @@ -38,3 +38,30 @@ pub fn mock_erc20_artifact_path() -> PathBuf { pub fn devnet_machine_image_path() -> PathBuf { workspace_root().join(DEFAULT_DEVNET_MACHINE_IMAGE_PATH) } + +const DEVNET_SEQUENCER_BIN: &str = "sequencer-devnet"; + +/// Resolve the `sequencer-devnet` binary built for the current Cargo invocation. +/// +/// Prefers `CARGO_TARGET_DIR` (set by `cargo run` / `cargo test` in sandboxes and +/// custom target dirs) over the workspace `target/debug/` tree, which may be stale +/// when builds only run through Cargo with a redirected target directory. +pub fn resolve_devnet_sequencer_bin() -> PathBuf { + if let Ok(path) = std::env::var("CARGO_BIN_EXE_SEQUENCER_DEVNET") { + let path = PathBuf::from(path); + if path.exists() { + return path; + } + } + if let Ok(target) = std::env::var("CARGO_TARGET_DIR") { + let path = PathBuf::from(target) + .join("debug") + .join(DEVNET_SEQUENCER_BIN); + if path.exists() { + return path; + } + } + workspace_root() + .join("target/debug") + .join(DEVNET_SEQUENCER_BIN) +} diff --git a/tests/harness/src/sequencer.rs b/tests/harness/src/sequencer.rs index f1e5f09..7f402df 100644 --- a/tests/harness/src/sequencer.rs +++ b/tests/harness/src/sequencer.rs @@ -33,6 +33,9 @@ pub struct ManagedSequencerConfig { pub sequencer_bin: PathBuf, pub log_prefix: String, pub logs_dir: PathBuf, + /// When false, the child runs without libfaketime (for tests that never + /// manipulate wall clock). Requires libfaketime in PATH when true. + pub faketime: bool, } /// Snapshot of the `batches` table. Returned by @@ -95,13 +98,12 @@ pub struct ManagedSequencer { /// When `None`, defaults to `DEVNET_CHAIN_ID` (matches Anvil). Set to /// a non-matching value to test chain-id-mismatch failure modes chain_id_override: Option, - /// Path to the file libfaketime re-reads for its offset, on every time - /// call (combined with `FAKETIME_NO_CACHE=1`). Writing to this file - /// shifts the sequencer's view of `SystemTime::now()` / `Instant::now()` - /// immediately — no respawn needed. - faketime_rc_path: PathBuf, - /// Cached libfaketime dylib/so path (computed once on spawn). - libfaketime_path: PathBuf, + /// Cached libfaketime dylib/so path (computed once on spawn). `None` when + /// [`ManagedSequencerConfig::faketime`] is false. + libfaketime_path: Option, + /// Path to the file libfaketime re-reads for its offset. `None` when + /// faketime is disabled. + faketime_rc_path: Option, /// Internal cumulative forward-offset tracker for /// [`Self::advance_wall_and_mine`]. Not touched by /// [`Self::set_faketime_offset`]. @@ -110,16 +112,31 @@ pub struct ManagedSequencer { pub fn default_devnet_sequencer_config(log_prefix: impl Into) -> ManagedSequencerConfig { ManagedSequencerConfig { - sequencer_bin: PathBuf::from(DEFAULT_DEVNET_SEQUENCER_BIN), + sequencer_bin: paths::resolve_devnet_sequencer_bin(), log_prefix: log_prefix.into(), logs_dir: PathBuf::from(DEFAULT_TEST_LOGS_DIR), + faketime: true, + } +} + +/// Devnet config without libfaketime (watchdog compare and other wall-clock-neutral tests). +pub fn devnet_sequencer_config_no_faketime( + log_prefix: impl Into, +) -> ManagedSequencerConfig { + ManagedSequencerConfig { + faketime: false, + ..default_devnet_sequencer_config(log_prefix) } } impl ManagedSequencer { pub async fn spawn(config: ManagedSequencerConfig) -> HarnessResult { let logs_dir = paths::resolve_from_workspace(&config.logs_dir); - let sequencer_bin = paths::resolve_from_workspace(&config.sequencer_bin); + let sequencer_bin = if config.sequencer_bin.is_absolute() { + config.sequencer_bin.clone() + } else { + paths::resolve_from_workspace(&config.sequencer_bin) + }; let log_prefix = config.log_prefix; let rollups = DevnetRollupsStack::spawn(log_prefix.as_str(), logs_dir.as_path()).await?; @@ -128,14 +145,15 @@ impl ManagedSequencer { .map_err(|err| io_other(format!("failed to create temp data dir: {err}")))?; let data_dir_path = data_dir.path().to_path_buf(); - // Set up faketime: locate libfaketime + create the rc file. Initial - // content `+0` means no offset; tests can overwrite with a new offset - // at any time and the running sequencer will see it on its next - // `SystemTime::now()` / `Instant::now()` call (FAKETIME_NO_CACHE=1). - let libfaketime_path = find_libfaketime()?; - let faketime_rc_path = data_dir_path.join("faketime.rc"); - fs::write(faketime_rc_path.as_path(), "+0\n") - .map_err(|err| io_other(format!("create faketime rc file: {err}")))?; + let (libfaketime_path, faketime_rc_path) = if config.faketime { + let libfaketime_path = find_libfaketime()?; + let faketime_rc_path = data_dir_path.join("faketime.rc"); + fs::write(faketime_rc_path.as_path(), "+0\n") + .map_err(|err| io_other(format!("create faketime rc file: {err}")))?; + (Some(libfaketime_path), Some(faketime_rc_path)) + } else { + (None, None) + }; let SpawnedSequencerProcess { child, @@ -149,8 +167,8 @@ impl ManagedSequencer { &rollups, None, None, - libfaketime_path.as_path(), - faketime_rc_path.as_path(), + libfaketime_path.as_deref(), + faketime_rc_path.as_deref(), ) .await?; @@ -210,8 +228,12 @@ impl ManagedSequencer { /// Replaces any cumulative advance tracked by /// [`Self::advance_wall_and_mine`], and resets its counter. pub fn set_faketime_offset(&mut self, offset: Option) -> HarnessResult<()> { + let rc = self + .faketime_rc_path + .as_ref() + .ok_or_else(|| io_other("faketime is disabled for this ManagedSequencer"))?; let s = offset.as_deref().unwrap_or("+0"); - fs::write(self.faketime_rc_path.as_path(), format!("{s}\n")) + fs::write(rc.as_path(), format!("{s}\n")) .map_err(|err| io_other(format!("write faketime rc file: {err}")))?; self.cumulative_offset_secs = 0; Ok(()) @@ -462,6 +484,10 @@ impl ManagedSequencer { self.rollups.app_address() } + pub fn input_box_address(&self) -> Address { + self.rollups.input_box_address() + } + pub fn erc20_portal_address(&self) -> Address { self.rollups.erc20_portal_address() } @@ -522,11 +548,12 @@ impl ManagedSequencer { let blocks = secs / SECONDS_PER_BLOCK; self.mine_l1_blocks(blocks).await?; self.cumulative_offset_secs = self.cumulative_offset_secs.saturating_add(secs); - fs::write( - self.faketime_rc_path.as_path(), - format!("+{}s\n", self.cumulative_offset_secs), - ) - .map_err(|err| io_other(format!("write faketime rc file: {err}")))?; + let rc = self + .faketime_rc_path + .as_ref() + .ok_or_else(|| io_other("faketime is disabled for this ManagedSequencer"))?; + fs::write(rc.as_path(), format!("+{}s\n", self.cumulative_offset_secs)) + .map_err(|err| io_other(format!("write faketime rc file: {err}")))?; Ok(()) } @@ -682,8 +709,8 @@ impl ManagedSequencer { &self.rollups, self.l1_endpoint_override.as_deref(), self.chain_id_override, - self.libfaketime_path.as_path(), - self.faketime_rc_path.as_path(), + self.libfaketime_path.as_deref(), + self.faketime_rc_path.as_deref(), ) .await?; self.child = child; @@ -772,8 +799,8 @@ async fn spawn_sequencer_process( rollups: &DevnetRollupsStack, l1_endpoint_override: Option<&str>, chain_id_override: Option, - libfaketime_path: &Path, - faketime_rc_path: &Path, + libfaketime_path: Option<&Path>, + faketime_rc_path: Option<&Path>, ) -> HarnessResult { let (endpoint, http_addr) = build_local_endpoint()?; let log_path = timestamped_log_path(logs_dir, log_prefix); @@ -796,7 +823,9 @@ async fn spawn_sequencer_process( // `Instant::now()` call thanks to FAKETIME_NO_CACHE=1, so tests can // shift the clock dynamically during a run. let mut cmd = Command::new(path_as_str(sequencer_bin)?); - apply_faketime_env(&mut cmd, libfaketime_path, faketime_rc_path)?; + if let (Some(lib), Some(rc)) = (libfaketime_path, faketime_rc_path) { + apply_faketime_env(&mut cmd, lib, rc)?; + } let chain_id = chain_id_override.unwrap_or(DEVNET_CHAIN_ID); let mut child = cmd diff --git a/watchdog/abi.lua b/watchdog/abi.lua new file mode 100644 index 0000000..0c423bd --- /dev/null +++ b/watchdog/abi.lua @@ -0,0 +1,138 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local abi = {} + +local WORD_HEX_LEN = 64 + +local function strip_0x(value) + assert(type(value) == "string", "hex value must be a string") + if value:sub(1, 2) == "0x" or value:sub(1, 2) == "0X" then + return value:sub(3) + end + return value +end + +local function assert_hex(value) + if value:match("^[0-9a-fA-F]*$") == nil then + error("invalid hex string") + end +end + +local function word_at(hex, index) + local start = (index * WORD_HEX_LEN) + 1 + local word = hex:sub(start, start + WORD_HEX_LEN - 1) + if #word ~= WORD_HEX_LEN then + error("ABI word out of bounds") + end + return word +end + +local function uint_word_to_number(word) + local value = 0 + for i = 1, #word do + local nibble = tonumber(word:sub(i, i), 16) + value = (value * 16) + nibble + if value > 9007199254740991 then + error("uint value too large for precise Lua number") + end + end + return value +end + +local function uint_word_to_hex(word) + local stripped = word:gsub("^0+", "") + if stripped == "" then + return "0x0" + end + return "0x" .. stripped:lower() +end + +local function address_from_word(word) + if word:sub(1, 24) ~= string.rep("0", 24) then + error("address word has non-zero high bytes") + end + return "0x" .. word:sub(25):lower() +end + +function abi.bytes_from_hex(hex) + hex = strip_0x(hex) + assert_hex(hex) + if (#hex % 2) ~= 0 then + error("hex string must have even length") + end + + return (hex:gsub("..", function(byte) + return string.char(tonumber(byte, 16)) + end)) +end + +function abi.hex_from_bytes(bytes) + return (bytes:gsub(".", function(char) + return string.format("%02x", char:byte()) + end)) +end + +function abi.decode_single_dynamic_bytes(encoded) + local hex = strip_0x(encoded) + assert_hex(hex) + local offset = uint_word_to_number(word_at(hex, 0)) + if (offset % 32) ~= 0 then + error("dynamic bytes offset is not word-aligned") + end + + local offset_words = offset // 32 + local len = uint_word_to_number(word_at(hex, offset_words)) + local data_hex_start = ((offset_words + 1) * WORD_HEX_LEN) + 1 + local data_hex = hex:sub(data_hex_start, data_hex_start + (len * 2) - 1) + if #data_hex ~= len * 2 then + error("dynamic bytes payload out of bounds") + end + return abi.bytes_from_hex(data_hex) +end + +function abi.decode_evm_advance_call(encoded) + local hex = strip_0x(encoded) + assert_hex(hex) + + -- EvmAdvanceCall is calldata, so accept and skip the 4-byte selector. + if (#hex % WORD_HEX_LEN) == 8 then + hex = hex:sub(9) + end + + local payload_offset = uint_word_to_number(word_at(hex, 7)) + if (payload_offset % 32) ~= 0 then + error("payload offset is not word-aligned") + end + + local payload_offset_words = payload_offset // 32 + local payload_len = uint_word_to_number(word_at(hex, payload_offset_words)) + local payload_hex_start = ((payload_offset_words + 1) * WORD_HEX_LEN) + 1 + local payload_hex = hex:sub(payload_hex_start, payload_hex_start + (payload_len * 2) - 1) + if #payload_hex ~= payload_len * 2 then + error("payload out of bounds") + end + + return { + chain_id_hex = uint_word_to_hex(word_at(hex, 0)), + app_contract = address_from_word(word_at(hex, 1)), + msg_sender = address_from_word(word_at(hex, 2)), + block_number = uint_word_to_number(word_at(hex, 3)), + block_timestamp_hex = uint_word_to_hex(word_at(hex, 4)), + prev_randao_hex = uint_word_to_hex(word_at(hex, 5)), + index_hex = uint_word_to_hex(word_at(hex, 6)), + payload = abi.bytes_from_hex(payload_hex), + } +end + +function abi.decode_input_added_log(log) + if type(log) ~= "table" or type(log.data) ~= "string" then + error("log.data is required") + end + local input = abi.decode_single_dynamic_bytes(log.data) + local decoded = abi.decode_evm_advance_call(abi.hex_from_bytes(input)) + decoded.raw_input = input + return decoded +end + +return abi diff --git a/watchdog/alarm.lua b/watchdog/alarm.lua new file mode 100644 index 0000000..085eff6 --- /dev/null +++ b/watchdog/alarm.lua @@ -0,0 +1,43 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local alarm = {} + +local function encode_json_object(fields) + local parts = {} + for key, value in pairs(fields) do + local encoded_value + if type(value) == "number" then + encoded_value = tostring(value) + elseif type(value) == "boolean" then + encoded_value = value and "true" or "false" + else + local string_value = tostring(value) + :gsub("\\", "\\\\") + :gsub('"', '\\"') + :gsub("\n", "\\n") + encoded_value = '"' .. string_value .. '"' + end + table.insert(parts, string.format('"%s":%s', key, encoded_value)) + end + return "{" .. table.concat(parts, ",") .. "}" +end + +function alarm.send_webhook(http, webhook_url, payload) + if not webhook_url or webhook_url == "" then + return nil, "WATCHDOG_WEBHOOK_URL is not configured" + end + local body = encode_json_object(payload) + local response, err = http:post(webhook_url, body, { + ["content-type"] = "application/json", + }) + if not response then + return nil, err + end + if response.status < 200 or response.status >= 300 then + return nil, "alarm webhook HTTP " .. tostring(response.status) + end + return true +end + +return alarm diff --git a/watchdog/checkpoint.lua b/watchdog/checkpoint.lua new file mode 100644 index 0000000..c43574d --- /dev/null +++ b/watchdog/checkpoint.lua @@ -0,0 +1,153 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local checkpoint = {} + +local function join(...) + local parts = { ... } + return table.concat(parts, "/"):gsub("//+", "/") +end + +local function read_all(path) + local file, err = io.open(path, "rb") + if not file then + return nil, err + end + local data = file:read("*a") + file:close() + return data +end + +local function write_all(path, data) + local file, err = io.open(path, "wb") + if not file then + return nil, err + end + file:write(data) + file:close() + return true +end + +local function shell_quote(value) + value = tostring(value) + return "'" .. value:gsub("'", "'\\''") .. "'" +end + +local function json_escape(value) + return value:gsub("\\", "\\\\"):gsub('"', '\\"'):gsub("\n", "\\n") +end + +local function manifest_json(manifest) + local fields = { + string.format('"safe_block":%d', manifest.safe_block), + string.format('"created_at":"%s"', json_escape(manifest.created_at or os.date("!%Y-%m-%dT%H:%M:%SZ"))), + } + if manifest.cm_image_hash then + table.insert(fields, string.format('"cm_image_hash":"%s"', json_escape(manifest.cm_image_hash))) + end + return "{" .. table.concat(fields, ",") .. "}\n" +end + +local function pointer_json(relative_path) + return string.format('{"checkpoint":"%s"}\n', json_escape(relative_path)) +end + +local function parse_pointer(data) + return data:match('"checkpoint"%s*:%s*"([^"]+)"') +end + +function checkpoint.safe_block_from_manifest(manifest_data) + local value = tostring(manifest_data or ""):match('"safe_block"%s*:%s*(%d+)') + if not value then + return nil, "manifest missing safe_block" + end + return tonumber(value) +end + +function checkpoint.load(dir) + local pointer_data, err = read_all(join(dir, "current.json")) + if not pointer_data then + return nil, err + end + local relative_path = parse_pointer(pointer_data) + if not relative_path then + return nil, "invalid checkpoint pointer" + end + local checkpoint_dir = join(dir, relative_path) + local manifest, manifest_err = read_all(join(checkpoint_dir, "manifest.json")) + if not manifest then + return nil, manifest_err + end + local safe_block, safe_block_err = checkpoint.safe_block_from_manifest(manifest) + if not safe_block then + return nil, safe_block_err + end + return { + path = checkpoint_dir, + manifest_json = manifest, + safe_block = safe_block, + snapshot_dir = join(checkpoint_dir, "snapshot"), + } +end + +function checkpoint.prepare(dir, safe_block) + assert(type(safe_block) == "number", "safe_block must be a number") + + local name = string.format("%020d", safe_block) + local relative_path = join("checkpoints", name) + local full_path = join(dir, relative_path) + local snapshot_dir = join(full_path, "snapshot") + + local ok = os.execute("mkdir -p " .. shell_quote(full_path)) + if ok ~= true and ok ~= 0 then + return nil, "mkdir failed: " .. full_path + end + + return { + path = full_path, + snapshot_dir = snapshot_dir, + relative_path = relative_path, + } +end + +function checkpoint.commit_prepared(dir, prepared, safe_block, manifest) + assert(type(prepared) == "table", "prepared checkpoint is required") + assert(type(safe_block) == "number", "safe_block must be a number") + manifest = manifest or {} + manifest.safe_block = safe_block + + local ok, err = write_all(join(prepared.path, "manifest.json"), manifest_json(manifest)) + if not ok then + return nil, err + end + + local tmp_pointer = join(dir, "current.json.tmp") + ok, err = write_all(tmp_pointer, pointer_json(prepared.relative_path)) + if not ok then + return nil, err + end + ok, err = os.rename(tmp_pointer, join(dir, "current.json")) + if not ok then + return nil, err + end + + return { + path = prepared.path, + snapshot_dir = prepared.snapshot_dir, + } +end + +function checkpoint.write(dir, safe_block, snapshot_writer, manifest) + assert(type(snapshot_writer) == "function", "snapshot_writer must be a function") + local prepared, prepare_err = checkpoint.prepare(dir, safe_block) + if not prepared then + return nil, prepare_err + end + local ok, err = snapshot_writer(prepared.snapshot_dir) + if not ok then + return nil, err + end + return checkpoint.commit_prepared(dir, prepared, safe_block, manifest) +end + +return checkpoint diff --git a/watchdog/compare.lua b/watchdog/compare.lua new file mode 100644 index 0000000..f1c0f52 --- /dev/null +++ b/watchdog/compare.lua @@ -0,0 +1,36 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local compare = {} + +function compare.first_mismatch_offset(a, b) + assert(type(a) == "string", "left value must be a string") + assert(type(b) == "string", "right value must be a string") + + local limit = math.min(#a, #b) + for i = 1, limit do + if a:byte(i) ~= b:byte(i) then + return i + end + end + + if #a ~= #b then + return limit + 1 + end + + return nil +end + +function compare.raw_equal(expected, actual) + local mismatch = compare.first_mismatch_offset(expected, actual) + return mismatch == nil, mismatch +end + +function compare.assert_state_response(state) + if type(state) ~= "string" then + return nil, "state must be a string" + end + return true +end + +return compare diff --git a/watchdog/config.lua b/watchdog/config.lua new file mode 100644 index 0000000..1f07dc1 --- /dev/null +++ b/watchdog/config.lua @@ -0,0 +1,90 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local config = {} + +local function required(name, env) + local value = env[name] + if value == nil or value == "" then + error(name .. " is required") + end + return value +end + +local function optional_number(name, default, env) + local value = env[name] + if value == nil or value == "" then + return default + end + local parsed = tonumber(value) + if not parsed then + error(name .. " must be a number") + end + return parsed +end + +local function optional_required_number(value_name, number_name, env) + if env[value_name] == nil or env[value_name] == "" then + return nil + end + local value = env[number_name] + if value == nil or value == "" then + error(number_name .. " is required when " .. value_name .. " is set") + end + return optional_number(number_name, nil, env) +end + +local function split_csv(value) + local out = {} + for part in tostring(value or ""):gmatch("[^,]+") do + table.insert(out, part) + end + return out +end + +function config.load(env) + env = env or os.getenv + if type(env) == "function" then + local getenv = env + env = setmetatable({}, { + __index = function(_, key) + return getenv(key) + end, + }) + end + + local mode = env.WATCHDOG_MODE or "advance" + if mode ~= "advance" and mode ~= "compare" then + error("WATCHDOG_MODE must be 'advance' or 'compare'") + end + + return { + mode = mode, + sequencer_url = mode == "compare" and required("WATCHDOG_SEQUENCER_URL", env) or env.WATCHDOG_SEQUENCER_URL, + l1_rpc_url = required("WATCHDOG_L1_RPC_URL", env), + input_box_address = required("WATCHDOG_INPUTBOX_ADDRESS", env), + app_address = required("WATCHDOG_APP_ADDRESS", env), + input_added_topic = env.WATCHDOG_INPUT_ADDED_TOPIC, + checkpoint_dir = required("WATCHDOG_CHECKPOINT_DIR", env), + cm_snapshot_dir = env.WATCHDOG_CM_SNAPSHOT_DIR, + cm_snapshot_safe_block = optional_required_number( + "WATCHDOG_CM_SNAPSHOT_DIR", + "WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK", + env + ), + cm_work_dir = env.WATCHDOG_CM_WORK_DIR or "/tmp", + cm_executable = env.WATCHDOG_CM_EXECUTABLE or "cartesi-machine", + target_safe_block = optional_number("WATCHDOG_TARGET_SAFE_BLOCK", nil, env), + poll_interval_sec = optional_number("WATCHDOG_POLL_INTERVAL_SEC", 30, env), + retry_attempts = optional_number("WATCHDOG_RETRY_ATTEMPTS", 3, env), + retry_delay_sec = optional_number("WATCHDOG_RETRY_DELAY_SEC", 5, env), + safe_confirmations = optional_number("WATCHDOG_SAFE_CONFIRMATIONS", 12, env), + webhook_url = env.WATCHDOG_WEBHOOK_URL, + once = env.WATCHDOG_ONCE == "1", + long_block_range_error_codes = split_csv( + env.WATCHDOG_LONG_BLOCK_RANGE_ERROR_CODES or "-32005,-32600,-32602,-32616" + ), + } +end + +return config diff --git a/watchdog/http.lua b/watchdog/http.lua new file mode 100644 index 0000000..b821a03 --- /dev/null +++ b/watchdog/http.lua @@ -0,0 +1,175 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local http = {} + +local function shell_quote(value) + return "'" .. string.gsub(value, "'", "'\\''") .. "'" +end + +--- HTTP client backed by the `curl` binary (for dev shells without lua-curl). +function http.new_cli() + local client = {} + + local function run_curl(method, url, body, headers) + local cmd = { + "curl", + "-sS", + "-w", + "%{http_code}", + "-X", + method, + } + if body then + table.insert(cmd, "--data-binary") + table.insert(cmd, shell_quote(body)) + end + table.insert(cmd, shell_quote(url)) + for key, value in pairs(headers or {}) do + table.insert(cmd, "-H") + table.insert(cmd, shell_quote(key .. ": " .. value)) + end + + local shell_cmd = table.concat(cmd, " ") .. " 2>&1" + local handle = io.popen(shell_cmd, "r") + if not handle then + return nil, "failed to start curl" + end + local output = handle:read("*a") + local ok_exit, exit_type, exit_code = handle:close() + if not output or output == "" then + return nil, "empty curl response" + end + local status_line = output:match("(%d%d%d)$") + if not status_line then + return nil, string.format( + "curl response missing status trailer (exit=%s/%s): %s", + tostring(exit_type), + tostring(exit_code), + output + ) + end + local body_text = output:sub(1, #output - #status_line) + local status = tonumber(status_line) + if not ok_exit and status == 0 then + return nil, string.format("curl failed: %s", output) + end + return { + status = status, + body = body_text, + headers = {}, + } + end + + function client.post(_self, url, body, headers) + return run_curl("POST", url, body, headers) + end + + function client.get(_self, url, headers) + return run_curl("GET", url, nil, headers) + end + + return client +end + +function http.new_auto() + local ok = pcall(function() + require("cURL") + end) + if ok then + return http.new_curl() + end + ok = pcall(function() + require("lcurl") + end) + if ok then + return http.new_curl() + end + return http.new_cli() +end + +function http.new_curl() + local ok, curl = pcall(require, "cURL") + if not ok then + ok, curl = pcall(require, "lcurl") + end + if not ok then + error("lua-curl binding not found; install lua-curl/lcurl or inject an http adapter") + end + + local client = {} + + function client.post(_self, url, body, headers) + local chunks = {} + local header_list = {} + for key, value in pairs(headers or {}) do + table.insert(header_list, key .. ": " .. value) + end + + local easy = curl.easy({ + url = url, + post = true, + postfields = body, + httpheader = header_list, + timeout = 30, + writefunction = function(chunk) + table.insert(chunks, chunk) + return #chunk + end, + }) + + local ok_perform, err = pcall(function() + easy:perform() + end) + if not ok_perform then + easy:close() + return nil, tostring(err) + end + + local status = easy:getinfo_response_code() + easy:close() + return { + status = status, + body = table.concat(chunks), + headers = {}, + } + end + + function client.get(_self, url, headers) + local chunks = {} + local header_list = {} + for key, value in pairs(headers or {}) do + table.insert(header_list, key .. ": " .. value) + end + + local easy = curl.easy({ + url = url, + httpheader = header_list, + timeout = 30, + writefunction = function(chunk) + table.insert(chunks, chunk) + return #chunk + end, + }) + + local ok_perform, err = pcall(function() + easy:perform() + end) + if not ok_perform then + easy:close() + return nil, tostring(err) + end + + local status = easy:getinfo_response_code() + easy:close() + return { + status = status, + body = table.concat(chunks), + headers = {}, + } + end + + return client +end + +return http diff --git a/watchdog/jsonrpc.lua b/watchdog/jsonrpc.lua new file mode 100644 index 0000000..e2989de --- /dev/null +++ b/watchdog/jsonrpc.lua @@ -0,0 +1,107 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local jsonrpc = {} + +local function quantity(value) + assert(type(value) == "number" and value >= 0, "quantity must be a non-negative number") + return string.format("0x%x", value) +end + +local function strip_0x(value) + return tostring(value):gsub("^0[xX]", "") +end + +local function topic_address(address) + assert(type(address) == "string" and address ~= "", "topic address is required") + local raw = strip_0x(address):lower() + assert(#raw == 40 and raw:match("^[0-9a-f]+$") ~= nil, "topic address must be 20-byte hex") + return "0x" .. string.rep("0", 24) .. raw +end + +function jsonrpc.new(http, json, url) + assert(type(http) == "table" and type(http.post) == "function", "http.post is required") + assert(type(json) == "table" and type(json.encode) == "function", "json.encode is required") + assert(type(json.decode) == "function", "json.decode is required") + assert(type(url) == "string" and url ~= "", "url is required") + + local client = { + http = http, + json = json, + url = url, + next_id = 1, + } + + function client:call(method, params) + local id = self.next_id + self.next_id = self.next_id + 1 + local body = self.json.encode({ + jsonrpc = "2.0", + id = id, + method = method, + params = params or {}, + }) + + local response, http_err = self.http:post(self.url, body, { + ["content-type"] = "application/json", + }) + if not response then + return nil, http_err + end + if response.status < 200 or response.status >= 300 then + return nil, "HTTP " .. tostring(response.status) + end + + local decoded + local ok = pcall(function() + decoded = self.json.decode(response.body) + end) + if not ok then + return nil, "invalid JSON-RPC response JSON" + end + if type(decoded) ~= "table" then + return nil, "JSON-RPC response must be an object" + end + if decoded.id ~= id then + return nil, "JSON-RPC response id mismatch" + end + if decoded.error ~= nil then + local code = decoded.error.code or "unknown" + local message = decoded.error.message or "JSON-RPC error" + return nil, tostring(code) .. ": " .. tostring(message) + end + return decoded.result + end + + function client:get_logs(filter) + assert(type(filter.input_added_topic) == "string", "input_added_topic is required") + local topics = { filter.input_added_topic } + if filter.app_address then + topics[2] = topic_address(filter.app_address) + end + + return self:call("eth_getLogs", { + { + address = filter.address, + fromBlock = quantity(filter.from_block), + toBlock = quantity(filter.to_block), + topics = topics, + }, + }) + end + + function client:get_block_number_by_tag(tag) + local block, err = self:call("eth_getBlockByNumber", { tag, false }) + if not block then + return nil, err + end + if type(block.number) ~= "string" then + return nil, "block response missing number" + end + return tonumber(block.number:gsub("^0[xX]", ""), 16) + end + + return client +end + +return jsonrpc diff --git a/watchdog/l1.lua b/watchdog/l1.lua new file mode 100644 index 0000000..5db728a --- /dev/null +++ b/watchdog/l1.lua @@ -0,0 +1,137 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local abi = require("watchdog.abi") + +local l1 = {} + +l1.INPUT_ADDED_TOPIC = "0xc05d337121a6e8605c6ec0b72aa29c4210ffe6e5b9cefdd6a7058188a8f66f98" + +l1.DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES = { + "-32005", + "-32600", + "-32602", + "-32616", +} + +local function contains_any(message, codes) + message = tostring(message or "") + for _, code in ipairs(codes or {}) do + if message:find(code, 1, true) then + return true + end + end + return false +end + +local function hex_quantity_to_number(value, field) + if type(value) == "number" then + return value + end + if type(value) ~= "string" or value:sub(1, 2) ~= "0x" then + error((field or "quantity") .. " must be an Ethereum hex quantity") + end + return tonumber(value:sub(3), 16) +end + +local function log_order_key(log) + return { + hex_quantity_to_number(log.blockNumber, "blockNumber"), + hex_quantity_to_number(log.transactionIndex or "0x0", "transactionIndex"), + hex_quantity_to_number(log.logIndex or "0x0", "logIndex"), + } +end + +function l1.sort_logs(logs) + table.sort(logs, function(a, b) + local ak = log_order_key(a) + local bk = log_order_key(b) + if ak[1] ~= bk[1] then + return ak[1] < bk[1] + end + if ak[2] ~= bk[2] then + return ak[2] < bk[2] + end + return ak[3] < bk[3] + end) + return logs +end + +function l1.fetch_logs_partitioned(rpc, params) + assert(type(rpc) == "table" and type(rpc.get_logs) == "function", "rpc.get_logs is required") + assert(type(params) == "table", "params are required") + + local start_block = assert(params.start_block, "start_block is required") + local end_block = assert(params.end_block, "end_block is required") + local codes = params.long_block_range_error_codes or l1.DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES + local input_added_topic = params.input_added_topic or l1.INPUT_ADDED_TOPIC + + local function go(from_block, to_block) + local logs, err = rpc:get_logs({ + address = params.input_box_address, + app_address = params.app_address, + from_block = from_block, + to_block = to_block, + input_added_topic = input_added_topic, + }) + if logs then + return logs + end + + if from_block < to_block and contains_any(err, codes) then + local mid = from_block + ((to_block - from_block) // 2) + local left, left_err = go(from_block, mid) + if not left then + return nil, left_err + end + local right, right_err = go(mid + 1, to_block) + if not right then + return nil, right_err + end + for _, log in ipairs(right) do + table.insert(left, log) + end + return left + end + + return nil, err + end + + if end_block < start_block then + return {} + end + + local logs, err = go(start_block, end_block) + if not logs then + return nil, err + end + return l1.sort_logs(logs) +end + +function l1.decode_and_validate_log(log) + local decoded = abi.decode_input_added_log(log) + local block_number = hex_quantity_to_number(log.blockNumber, "blockNumber") + if decoded.block_number ~= block_number then + error(string.format( + "InputAdded block number mismatch: log=%d payload=%d", + block_number, + decoded.block_number + )) + end + return decoded +end + +function l1.fetch_inputs(rpc, params) + local logs, err = l1.fetch_logs_partitioned(rpc, params) + if not logs then + return nil, err + end + + local inputs = {} + for _, log in ipairs(logs) do + table.insert(inputs, l1.decode_and_validate_log(log)) + end + return inputs +end + +return l1 diff --git a/watchdog/machine.lua b/watchdog/machine.lua new file mode 100644 index 0000000..62bdc87 --- /dev/null +++ b/watchdog/machine.lua @@ -0,0 +1,43 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local machine = {} + +function machine.new(binding) + assert(type(binding) == "table", "Cartesi Machine binding is required") + assert(type(binding.load_snapshot) == "function", "binding.load_snapshot is required") + assert(type(binding.feed_input) == "function", "binding.feed_input is required") + assert(type(binding.inspect_state) == "function", "binding.inspect_state is required") + assert(type(binding.save_snapshot) == "function", "binding.save_snapshot is required") + + local driver = { binding = binding } + + function driver:load(path) + return self.binding.load_snapshot(path) + end + + function driver:feed_inputs(instance, inputs) + for _, input in ipairs(inputs) do + -- Do not classify payload tags here. The scheduler inside the CM owns + -- direct-input vs batch-submission semantics. + local ok, err = self.binding.feed_input(instance, input) + if not ok then + return nil, err + end + end + return true + end + + function driver:inspect_state(instance) + return self.binding.inspect_state(instance) + end + + function driver:save(instance, path) + assert(type(self.binding.save_snapshot) == "function", "binding.save_snapshot is required") + return self.binding.save_snapshot(instance, path) + end + + return driver +end + +return machine diff --git a/watchdog/machine_cli.lua b/watchdog/machine_cli.lua new file mode 100644 index 0000000..ac73a36 --- /dev/null +++ b/watchdog/machine_cli.lua @@ -0,0 +1,176 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local machine = require("watchdog.machine") + +local machine_cli = {} + +local STATE_INSPECT_QUERY = "state" + +local function shell_quote(value) + value = tostring(value) + return "'" .. value:gsub("'", "'\\''") .. "'" +end + +local function mkdir_p(path) + local ok = os.execute("mkdir -p " .. shell_quote(path)) + if ok ~= true and ok ~= 0 then + return nil, "mkdir failed: " .. path + end + return true +end + +local function write_all(path, data) + local file, err = io.open(path, "wb") + if not file then + return nil, err + end + file:write(data) + file:close() + return true +end + +local function read_all(path) + local file, err = io.open(path, "rb") + if not file then + return nil, err + end + local data = file:read("*a") + file:close() + return data +end + +local function run_command(command) + local ok = os.execute(command) + if ok == true or ok == 0 then + return true + end + return nil, "command failed: " .. command +end + +local function new_work_dir(base_dir) + base_dir = base_dir or "/tmp" + return string.format( + "%s/watchdog-cm-%d-%d", + base_dir:gsub("/+$", ""), + os.time(), + math.random(1000000) + ) +end + +function machine_cli.new(opts) + opts = opts or {} + local executable = opts.executable or "cartesi-machine" + local work_dir = opts.work_dir or "/tmp" + + local binding = {} + + function binding.load_snapshot(snapshot_dir) + assert(type(snapshot_dir) == "string" and snapshot_dir ~= "", "snapshot_dir is required") + local instance = { + source_snapshot_dir = snapshot_dir, + work_dir = new_work_dir(work_dir), + input_dir = nil, + input_count = 0, + } + local ok, err = mkdir_p(instance.work_dir) + if not ok then + return nil, err + end + instance.input_dir = instance.work_dir .. "/inputs" + ok, err = mkdir_p(instance.input_dir) + if not ok then + return nil, err + end + return instance + end + + function binding.feed_input(instance, input) + assert(type(instance) == "table", "machine instance is required") + assert(type(input) == "table", "input is required") + local raw_input = input.raw_input + if type(raw_input) ~= "string" then + return nil, "input.raw_input is required" + end + + local path = string.format("%s/input-%d.bin", instance.input_dir, instance.input_count) + local ok, err = write_all(path, raw_input) + if not ok then + return nil, err + end + instance.input_count = instance.input_count + 1 + return true + end + + function binding.inspect_state(instance) + assert(type(instance) == "table", "machine instance is required") + + local query_path = instance.work_dir .. "/inspect-query.bin" + local report_path = instance.work_dir .. "/inspect-report-0.bin" + + local ok, err = write_all(query_path, STATE_INSPECT_QUERY) + if not ok then + return nil, err + end + + local command_parts = { + shell_quote(executable), + "--no-rollback", + "--load=" .. shell_quote(instance.source_snapshot_dir) .. ",sharing:none", + } + if instance.input_count > 0 then + table.insert( + command_parts, + "--cmio-advance-state=input:" + .. shell_quote(instance.input_dir .. "/input-%i.bin") + .. ",input_index_begin:0,input_index_end:" + .. tostring(instance.input_count) + ) + end + table.insert( + command_parts, + "--cmio-inspect-state=query:" + .. shell_quote(query_path) + .. ",report:" + .. shell_quote(instance.work_dir .. "/inspect-report-%o.bin") + ) + table.insert(command_parts, "--quiet") + + ok, err = run_command(table.concat(command_parts, " ")) + if not ok then + return nil, err + end + + local report, read_err = read_all(report_path) + if not report then + return nil, "failed to read inspect report: " .. tostring(read_err) + end + return report + end + + function binding.save_snapshot(instance, snapshot_dir) + assert(type(instance) == "table", "machine instance is required") + assert(type(snapshot_dir) == "string" and snapshot_dir ~= "", "snapshot_dir is required") + + local command = table.concat({ + shell_quote(executable), + "--no-rollback", + "--load=" .. shell_quote(instance.source_snapshot_dir) .. ",sharing:none", + "--cmio-advance-state=input:" .. shell_quote(instance.input_dir .. "/input-%i.bin") + .. ",input_index_begin:0,input_index_end:" .. tostring(instance.input_count), + "--store=" .. shell_quote(snapshot_dir), + "--quiet", + }, " ") + + return run_command(command) + end + + return machine.new(binding) +end + +machine_cli._private = { + shell_quote = shell_quote, + STATE_INSPECT_QUERY = STATE_INSPECT_QUERY, +} + +return machine_cli diff --git a/watchdog/main.lua b/watchdog/main.lua new file mode 100644 index 0000000..2c93625 --- /dev/null +++ b/watchdog/main.lua @@ -0,0 +1,78 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local config = require("watchdog.config") +local http_mod = require("watchdog.http") +local jsonrpc = require("watchdog.jsonrpc") +local machine_cli = require("watchdog.machine_cli") +local retry = require("watchdog.retry") +local runner = require("watchdog.runner") +local sequencer_mod = require("watchdog.sequencer") + +local function load_json() + local ok, cjson = pcall(require, "cjson") + if ok then + return cjson + end + error("lua-cjson is required for watchdog runtime") +end + +local function load_machine(cfg) + return machine_cli.new({ + executable = cfg.cm_executable, + work_dir = cfg.cm_work_dir, + }) +end + +local function run_once(cfg, deps) + if cfg.mode == "compare" then + return runner.run_once(cfg, deps) + end + return runner.advance_checkpoint_once(cfg, deps) +end + +local function main() + local cfg = config.load() + local http = http_mod.new_auto() + local json = load_json() + local deps = { + http = http, + rpc = jsonrpc.new(http, json, cfg.l1_rpc_url), + machine = load_machine(cfg), + } + if cfg.sequencer_url then + deps.sequencer = sequencer_mod.new(http, json, cfg.sequencer_url) + end + + repeat + local result, err = retry.with_retries(function() + local ok, value = pcall(run_once, cfg, deps) + if not ok then + return nil, value + end + return value + end, { + attempts = cfg.retry_attempts, + delay_sec = cfg.retry_delay_sec, + }) + if result == nil then + io.stderr:write("watchdog run failed: " .. tostring(err) .. "\n") + os.exit(1) + end + if cfg.once then + return + end + os.execute("sleep " .. tostring(cfg.poll_interval_sec)) + until false +end + +if ... == nil then + main() +end + +return { + main = main, + run_once = run_once, +} diff --git a/watchdog/retry.lua b/watchdog/retry.lua new file mode 100644 index 0000000..4316312 --- /dev/null +++ b/watchdog/retry.lua @@ -0,0 +1,30 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local retry = {} + +function retry.with_retries(fn, opts) + opts = opts or {} + local attempts = math.max(1, opts.attempts or 1) + local delay_sec = opts.delay_sec or 0 + local sleep = opts.sleep or function(seconds) + if seconds > 0 then + os.execute("sleep " .. tostring(seconds)) + end + end + + local last_err + for attempt = 1, attempts do + local result, err = fn(attempt) + if result then + return result + end + last_err = err + if attempt < attempts then + sleep(delay_sec) + end + end + return nil, last_err +end + +return retry diff --git a/watchdog/runner.lua b/watchdog/runner.lua new file mode 100644 index 0000000..6fc60bf --- /dev/null +++ b/watchdog/runner.lua @@ -0,0 +1,253 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local alarm = require("watchdog.alarm") +local checkpoint = require("watchdog.checkpoint") +local compare = require("watchdog.compare") +local l1 = require("watchdog.l1") + +local runner = {} + +local function require_dep(deps, name) + local value = deps[name] + assert(value ~= nil, "missing dependency: " .. name) + return value +end + +local function load_checkpoint(cfg, checkpoint_mod) + local loaded = checkpoint_mod.load(cfg.checkpoint_dir) + if loaded then + return loaded + end + + if not cfg.cm_snapshot_dir or cfg.cm_snapshot_dir == "" then + error("no checkpoint found and WATCHDOG_CM_SNAPSHOT_DIR is not configured") + end + if type(cfg.cm_snapshot_safe_block) ~= "number" then + error("no checkpoint found and WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK is not configured") + end + + return { + snapshot_dir = cfg.cm_snapshot_dir, + safe_block = cfg.cm_snapshot_safe_block, + } +end + +local function fetch_inputs(cfg, deps, from_block, to_block) + if from_block > to_block then + return {} + end + + if deps.fetch_inputs then + return deps.fetch_inputs(from_block, to_block) + end + + local rpc = require_dep(deps, "rpc") + return l1.fetch_inputs(rpc, { + start_block = from_block, + end_block = to_block, + input_box_address = cfg.input_box_address, + app_address = cfg.app_address, + input_added_topic = cfg.input_added_topic, + long_block_range_error_codes = cfg.long_block_range_error_codes, + }) +end + +local function step(deps, message) + if deps and type(deps.log_step) == "function" then + deps.log_step(message) + end +end + +local function send_alarm(cfg, deps, payload) + if deps.alarm then + return deps.alarm(payload) + end + if deps.http and cfg.webhook_url then + return alarm.send_webhook(deps.http, cfg.webhook_url, payload) + end + return true +end + +local function target_safe_block(cfg, deps) + if type(cfg.target_safe_block) == "number" then + return cfg.target_safe_block + end + if deps.safe_block then + return deps.safe_block() + end + if deps.rpc and type(deps.rpc.get_block_number_by_tag) == "function" then + return deps.rpc:get_block_number_by_tag("safe") + end + return nil, "target safe block is not configured" +end + +function runner.run_once(cfg, deps) + deps = deps or {} + local checkpoint_mod = deps.checkpoint or checkpoint + local sequencer = require_dep(deps, "sequencer") + local machine = require_dep(deps, "machine") + + step(deps, "load checkpoint or bootstrap CM snapshot") + local loaded = load_checkpoint(cfg, checkpoint_mod) + step(deps, "fetch sequencer GET /get_state") + local sequencer_state, state_err = sequencer:get_state() + if not sequencer_state then + return nil, state_err + end + + local safe_block_prev = loaded.safe_block or 0 + local safe_block_next = sequencer_state.safe_block + step(deps, string.format( + "check safe_block monotonicity (prev=%s next=%s)", + tostring(safe_block_prev), + tostring(safe_block_next) + )) + if safe_block_next < safe_block_prev then + local payload = { + kind = "safe_block_regressed", + previous_safe_block = safe_block_prev, + sequencer_safe_block = safe_block_next, + } + send_alarm(cfg, deps, payload) + return nil, payload + end + + step(deps, string.format( + "fetch L1 InputAdded logs for blocks %s..%s", + tostring(safe_block_prev + 1), + tostring(safe_block_next) + )) + local inputs, input_err = fetch_inputs(cfg, deps, safe_block_prev + 1, safe_block_next) + if not inputs then + return nil, input_err + end + + step(deps, "load CM snapshot directory") + local instance, load_err = machine:load(loaded.snapshot_dir) + if not instance then + return nil, load_err + end + + step(deps, string.format("feed %d decoded inputs into CM", #inputs)) + local fed, feed_err = machine:feed_inputs(instance, inputs) + if not fed then + return nil, feed_err + end + + step(deps, "run CM inspect (state query)") + local cm_state, inspect_err = machine:inspect_state(instance) + if not cm_state then + return nil, inspect_err + end + + step(deps, "compare sequencer state bytes against CM inspect report") + local equal, mismatch_offset = compare.raw_equal(sequencer_state.state, cm_state) + if not equal then + local payload = { + kind = "state_mismatch", + previous_safe_block = safe_block_prev, + sequencer_safe_block = safe_block_next, + mismatch_offset = mismatch_offset, + } + send_alarm(cfg, deps, payload) + return nil, payload + end + + if safe_block_next > safe_block_prev then + step(deps, "persist new manifest-backed checkpoint") + local written, write_err = checkpoint_mod.write(cfg.checkpoint_dir, safe_block_next, function(snapshot_dir) + return machine:save(instance, snapshot_dir) + end, { + created_at = os.date("!%Y-%m-%dT%H:%M:%SZ"), + cm_image_hash = cfg.cm_image_hash, + }) + if not written then + return nil, write_err + end + else + step(deps, "safe_block unchanged; skip checkpoint write") + end + + step(deps, "compare pass complete") + return { + ok = true, + previous_safe_block = safe_block_prev, + safe_block = safe_block_next, + input_count = #inputs, + } +end + +function runner.advance_checkpoint_once(cfg, deps) + deps = deps or {} + local checkpoint_mod = deps.checkpoint or checkpoint + local machine = require_dep(deps, "machine") + + step(deps, "load checkpoint or bootstrap CM snapshot") + local loaded = load_checkpoint(cfg, checkpoint_mod) + local safe_block_prev = loaded.safe_block or 0 + step(deps, "resolve target safe block") + local safe_block_next, safe_err = target_safe_block(cfg, deps) + if not safe_block_next then + return nil, safe_err + end + step(deps, string.format( + "check safe_block monotonicity (prev=%s next=%s)", + tostring(safe_block_prev), + tostring(safe_block_next) + )) + if safe_block_next < safe_block_prev then + return nil, { + kind = "safe_block_regressed", + previous_safe_block = safe_block_prev, + safe_block = safe_block_next, + } + end + + step(deps, string.format( + "fetch L1 InputAdded logs for blocks %s..%s", + tostring(safe_block_prev + 1), + tostring(safe_block_next) + )) + local inputs, input_err = fetch_inputs(cfg, deps, safe_block_prev + 1, safe_block_next) + if not inputs then + return nil, input_err + end + + step(deps, "load CM snapshot directory") + local instance, load_err = machine:load(loaded.snapshot_dir) + if not instance then + return nil, load_err + end + + step(deps, string.format("feed %d decoded inputs into CM", #inputs)) + local fed, feed_err = machine:feed_inputs(instance, inputs) + if not fed then + return nil, feed_err + end + + if safe_block_next > safe_block_prev then + step(deps, "persist new manifest-backed checkpoint") + local written, write_err = checkpoint_mod.write(cfg.checkpoint_dir, safe_block_next, function(snapshot_dir) + return machine:save(instance, snapshot_dir) + end, { + created_at = os.date("!%Y-%m-%dT%H:%M:%SZ"), + cm_image_hash = cfg.cm_image_hash, + }) + if not written then + return nil, write_err + end + else + step(deps, "safe_block unchanged; skip checkpoint write") + end + + step(deps, "advance pass complete") + return { + ok = true, + previous_safe_block = safe_block_prev, + safe_block = safe_block_next, + input_count = #inputs, + } +end + +return runner diff --git a/watchdog/sequencer.lua b/watchdog/sequencer.lua new file mode 100644 index 0000000..93b9ddb --- /dev/null +++ b/watchdog/sequencer.lua @@ -0,0 +1,48 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local compare = require("watchdog.compare") + +local sequencer = {} + +function sequencer.new(http, json, base_url) + assert(type(http) == "table" and type(http.get) == "function", "http.get is required") + assert(type(json) == "table" and type(json.decode) == "function", "json.decode is required") + assert(type(base_url) == "string" and base_url ~= "", "base_url is required") + + local client = { + http = http, + json = json, + base_url = base_url:gsub("/+$", ""), + } + + function client:get_state() + local response, err = self.http:get(self.base_url .. "/get_state") + if not response then + return nil, err + end + if response.status < 200 or response.status >= 300 then + return nil, "HTTP " .. tostring(response.status) + end + + local decoded + local ok_decode = pcall(function() + decoded = self.json.decode(response.body) + end) + if not ok_decode or type(decoded) ~= "table" then + return nil, "invalid get_state response JSON" + end + if type(decoded.safe_block) ~= "number" then + return nil, "safe_block must be a number" + end + local ok, validation_err = compare.assert_state_response(decoded.state) + if not ok then + return nil, validation_err + end + return decoded + end + + return client +end + +return sequencer diff --git a/watchdog/tests/drill_divergence.lua b/watchdog/tests/drill_divergence.lua new file mode 100644 index 0000000..6e22e60 --- /dev/null +++ b/watchdog/tests/drill_divergence.lua @@ -0,0 +1,58 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) +-- +-- Staging drill: force a state_mismatch alarm through compare mode with a +-- deliberately wrong sequencer state (fake HTTP client). No CM required. +-- +-- Usage: +-- WATCHDOG_WEBHOOK_URL=https://example.com/hook lua watchdog/tests/drill_divergence.lua + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local deps_lua = os.getenv("WATCHDOG_LUA_DEPS") +if deps_lua and deps_lua ~= "" then + package.cpath = deps_lua .. "/?.so;" .. package.cpath +end + +local alarm = require("watchdog.alarm") +local compare = require("watchdog.compare") +local http_mod = require("watchdog.http") +local log = dofile("watchdog/tests/e2e_log.lua") + +local webhook_url = os.getenv("WATCHDOG_WEBHOOK_URL") +if not webhook_url or webhook_url == "" then + io.stderr:write("WATCHDOG_WEBHOOK_URL is required\n") + os.exit(1) +end + +local sequencer_state = '{"balances":{"0x01":1},"nonces":{}}' +local cm_state = '{"balances":{},"nonces":{}}' +local equal, mismatch_offset = compare.raw_equal(sequencer_state, cm_state) +if equal then + log.fail("divergence-setup", "expected mismatch for drill") + os.exit(1) +end + +local payload = { + kind = "state_mismatch", + previous_safe_block = 0, + sequencer_safe_block = 0, + mismatch_offset = mismatch_offset, + run_id = "drill-forced-divergence", +} + +log.banner("divergence-webhook-drill") +log.step("divergence-webhook-drill", 1, 1, "POST state_mismatch payload (synthetic bytes)") + +local http = http_mod.new_auto() +local ok, err = alarm.send_webhook(http, webhook_url, payload) +if not ok then + log.fail("divergence-webhook-drill", err) + os.exit(1) +end + +log.pass( + "divergence-webhook-drill", + string.format("HTTP 2xx; mismatch_offset=%s", tostring(mismatch_offset)) +) +os.exit(0) diff --git a/watchdog/tests/drill_webhook.lua b/watchdog/tests/drill_webhook.lua new file mode 100644 index 0000000..b26a290 --- /dev/null +++ b/watchdog/tests/drill_webhook.lua @@ -0,0 +1,64 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) +-- +-- Staging/local drill: fire sample watchdog alarm payloads at WATCHDOG_WEBHOOK_URL. +-- Usage: +-- WATCHDOG_WEBHOOK_URL=https://example.com/hook lua watchdog/tests/drill_webhook.lua + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local deps_lua = os.getenv("WATCHDOG_LUA_DEPS") +if deps_lua and deps_lua ~= "" then + package.cpath = deps_lua .. "/?.so;" .. package.cpath +end + +local alarm = require("watchdog.alarm") +local http_mod = require("watchdog.http") +local log = dofile("watchdog/tests/e2e_log.lua") + +local webhook_url = os.getenv("WATCHDOG_WEBHOOK_URL") +if not webhook_url or webhook_url == "" then + io.stderr:write("WATCHDOG_WEBHOOK_URL is required\n") + os.exit(1) +end + +local drills = { + { + name = "state_mismatch", + payload = { + kind = "state_mismatch", + previous_safe_block = 10, + sequencer_safe_block = 12, + mismatch_offset = 4, + run_id = "drill-state-mismatch", + }, + }, + { + name = "safe_block_regressed", + payload = { + kind = "safe_block_regressed", + previous_safe_block = 20, + sequencer_safe_block = 19, + run_id = "drill-safe-block-regressed", + }, + }, +} + +local http = http_mod.new_auto() +local failures = 0 + +log.banner("webhook-delivery-drills") +for index, drill in ipairs(drills) do + log.step("webhook-delivery-drills", index, #drills, "POST " .. drill.name .. " payload") + local ok, err = alarm.send_webhook(http, webhook_url, drill.payload) + if not ok then + failures = failures + 1 + log.fail(drill.name, err) + else + log.pass(drill.name, "HTTP 2xx from " .. webhook_url) + end +end + +if failures > 0 then + os.exit(1) +end diff --git a/watchdog/tests/e2e.lua b/watchdog/tests/e2e.lua new file mode 100644 index 0000000..7a799c1 --- /dev/null +++ b/watchdog/tests/e2e.lua @@ -0,0 +1,302 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) +-- +-- Real watchdog end-to-end checks against cartesi-machine (and optionally a +-- live sequencer). Run from repo root: +-- lua watchdog/tests/e2e.lua +-- or: +-- just test-watchdog-e2e + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local checkpoint = require("watchdog.checkpoint") +local machine_cli = require("watchdog.machine_cli") +local runner = require("watchdog.runner") +local log = dofile("watchdog/tests/e2e_log.lua") + +local MACHINE_IMAGE = "examples/canonical-app/out/canonical-machine-image" +local GENESIS_SAFE_BLOCK = 0 + +local scenarios = {} +local failures = 0 +local skips = 0 + +local function assert_true(value, message) + if not value then + error(message or "assertion failed", 2) + end +end + +local function command_exists(name) + local ok = os.execute("command -v " .. name .. " >/dev/null 2>&1") + return ok == true or ok == 0 +end + +local function path_is_dir(path) + local ok, err, code = os.rename(path, path) + if ok then + return true + end + if code == 13 then + return true + end + return false, err +end + +local function temp_dir(prefix) + local base = os.tmpname() + os.remove(base) + local dir = string.format("%s-%s", prefix, base:match("([^/]+)$")) + local ok = os.execute('mkdir -p "' .. dir .. '"') + if ok ~= true and ok ~= 0 then + error("mkdir failed for " .. dir) + end + return dir +end + +local function make_step_logger(scenario, total) + local index = 0 + return function(message) + index = index + 1 + log.step(scenario, index, total, message) + end +end + +local function run_scenario(name, fn) + log.banner(name) + local ok, result = pcall(fn) + if not ok then + failures = failures + 1 + log.fail(name, result) + return + end + if result == "skip" then + skips = skips + 1 + return + end + log.pass(name) +end + +local function skip(scenario, reason) + log.skip(scenario, reason) + return "skip" +end + +local function require_cartesi_machine(scenario) + if not command_exists("cartesi-machine") then + return skip(scenario, "cartesi-machine not on PATH (install via nix develop / Cartesi tools)") + end + return true +end + +local function require_machine_image(scenario) + if not path_is_dir(MACHINE_IMAGE) then + return skip( + scenario, + "canonical machine image missing at " + .. MACHINE_IMAGE + .. " (run: just canonical-build-machine-image)" + ) + end + return true +end + +local function advance_cfg(checkpoint_dir, target_safe_block) + return { + mode = "advance", + checkpoint_dir = checkpoint_dir, + cm_snapshot_dir = MACHINE_IMAGE, + cm_snapshot_safe_block = GENESIS_SAFE_BLOCK, + target_safe_block = target_safe_block, + cm_executable = "cartesi-machine", + cm_work_dir = temp_dir("watchdog-e2e-work"), + } +end + +table.insert(scenarios, { + name = "prerequisites", + fn = function() + local scenario = "prerequisites" + log.step(scenario, 1, 3, "check cartesi-machine is on PATH") + if not command_exists("cartesi-machine") then + error("cartesi-machine not on PATH") + end + log.step(scenario, 2, 3, "check canonical machine image directory exists") + if not path_is_dir(MACHINE_IMAGE) then + error("missing machine image at " .. MACHINE_IMAGE) + end + log.step(scenario, 3, 3, "record paths used by later scenarios") + log.info("machine image: " .. MACHINE_IMAGE) + log.info("genesis safe_block: " .. tostring(GENESIS_SAFE_BLOCK)) + end, +}) + +table.insert(scenarios, { + name = "advance-empty-range", + fn = function() + local scenario = "advance-empty-range" + if require_cartesi_machine(scenario) == "skip" then + return "skip" + end + if require_machine_image(scenario) == "skip" then + return "skip" + end + + local checkpoint_dir = temp_dir("watchdog-e2e-checkpoint") + local target_safe_block = 1 + log.step(scenario, 1, 5, "prepare temp checkpoint dir: " .. checkpoint_dir) + log.step(scenario, 2, 5, "bootstrap from genesis snapshot at safe_block=" .. GENESIS_SAFE_BLOCK) + log.step(scenario, 3, 5, "run advance mode with empty L1 input range 1.." .. target_safe_block) + local result, err = runner.advance_checkpoint_once(advance_cfg(checkpoint_dir, target_safe_block), { + machine = machine_cli.new({ + executable = "cartesi-machine", + work_dir = temp_dir("watchdog-e2e-advance-work"), + }), + log_step = make_step_logger(scenario .. "/runner", 8), + fetch_inputs = function(from_block, to_block) + assert_true(from_block == 1, "expected from_block=1") + assert_true(to_block == target_safe_block, "expected to_block=" .. target_safe_block) + return {} + end, + }) + assert_true(result, "advance failed: " .. tostring(err)) + assert_true(result.safe_block == target_safe_block, "unexpected safe_block") + assert_true(result.input_count == 0, "expected zero inputs") + + log.step(scenario, 4, 5, "verify manifest-backed checkpoint was written") + local current = checkpoint.load(checkpoint_dir) + assert_true(current, "checkpoint current.json missing after advance") + assert_true(current.safe_block == target_safe_block, "checkpoint safe_block mismatch") + + log.step(scenario, 5, 5, "verify snapshot directory exists under checkpoint") + assert_true(path_is_dir(current.snapshot_dir), "checkpoint snapshot dir missing") + log.info("wrote checkpoint safe_block=" .. tostring(current.safe_block)) + end, +}) + +table.insert(scenarios, { + name = "cm-inspect-state-query", + fn = function() + local scenario = "cm-inspect-state-query" + if require_cartesi_machine(scenario) == "skip" then + return "skip" + end + if require_machine_image(scenario) == "skip" then + return "skip" + end + + log.step(scenario, 1, 4, "create machine_cli adapter") + local work_dir = temp_dir("watchdog-e2e-inspect") + local machine = machine_cli.new({ + executable = "cartesi-machine", + work_dir = work_dir, + }) + + log.step(scenario, 2, 4, "load genesis snapshot from " .. MACHINE_IMAGE) + local instance = assert(machine:load(MACHINE_IMAGE), "load snapshot failed") + + log.step(scenario, 3, 4, "run --cmio-inspect-state with query=state (no new inputs)") + local report, inspect_err = machine:inspect_state(instance) + assert_true(report, "inspect failed: " .. tostring(inspect_err)) + + log.step(scenario, 4, 4, "validate inspect report looks like wallet JSON state") + if report:find("inspect endpoint not implemented", 1, true) then + return skip( + scenario, + "machine image dapp is stale; rebuild with: just canonical-build-machine-image" + ) + end + assert_true(report:sub(1, 1) == "{", "inspect report is not JSON object bytes: " .. report) + assert_true(report:find('"balances"', 1, true) ~= nil, "inspect report missing balances field") + log.info("inspect report bytes=" .. tostring(#report)) + end, +}) + +table.insert(scenarios, { + name = "compare-runner-with-sequencer", + fn = function() + local scenario = "compare-runner-with-sequencer" + local sequencer_url = os.getenv("WATCHDOG_E2E_SEQUENCER_URL") + if not sequencer_url or sequencer_url == "" then + return skip( + scenario, + "set WATCHDOG_E2E_SEQUENCER_URL to a live sequencer base URL to run this scenario" + ) + end + if require_cartesi_machine(scenario) == "skip" then + return "skip" + end + if require_machine_image(scenario) == "skip" then + return "skip" + end + + local http_mod = require("watchdog.http") + local jsonrpc = require("watchdog.jsonrpc") + local sequencer_mod = require("watchdog.sequencer") + local ok_json, cjson = pcall(require, "cjson") + if not ok_json then + return skip(scenario, "lua-cjson required for live sequencer compare") + end + + local checkpoint_dir = temp_dir("watchdog-e2e-compare") + log.step(scenario, 1, 2, "prepare compare-mode deps (sequencer=" .. sequencer_url .. ")") + log.step(scenario, 2, 2, "run compare runner against live sequencer + CM") + + local http = http_mod.new_auto() + local cfg = { + mode = "compare", + sequencer_url = sequencer_url, + checkpoint_dir = checkpoint_dir, + cm_snapshot_dir = MACHINE_IMAGE, + cm_snapshot_safe_block = GENESIS_SAFE_BLOCK, + cm_executable = "cartesi-machine", + cm_work_dir = temp_dir("watchdog-e2e-compare-work"), + l1_rpc_url = os.getenv("WATCHDOG_E2E_L1_RPC_URL") or "http://127.0.0.1:8545", + input_box_address = os.getenv("WATCHDOG_E2E_INPUTBOX_ADDRESS") + or "0x0000000000000000000000000000000000000000", + app_address = os.getenv("WATCHDOG_E2E_APP_ADDRESS") + or "0x1111111111111111111111111111111111111111", + long_block_range_error_codes = { "-32005" }, + } + + local step_no = 0 + local result, err = runner.run_once(cfg, { + http = http, + rpc = jsonrpc.new(http, cjson, cfg.l1_rpc_url), + sequencer = sequencer_mod.new(http, cjson, sequencer_url), + machine = machine_cli.new({ + executable = cfg.cm_executable, + work_dir = cfg.cm_work_dir, + }), + log_step = function(message) + step_no = step_no + 1 + log.step(scenario .. "/runner", step_no, 12, message) + end, + }) + + assert_true(result, "compare run failed: " .. tostring(err)) + log.info(string.format( + "compare ok: safe_block=%s input_count=%s", + tostring(result.safe_block), + tostring(result.input_count) + )) + end, +}) + +log.info("starting watchdog real end-to-end suite (" .. #scenarios .. " scenarios)") +for _, scenario in ipairs(scenarios) do + run_scenario(scenario.name, scenario.fn) +end + +io.write("\n[watchdog-e2e] ───────────────────────────────────────────────────────\n") +io.write(string.format( + "[watchdog-e2e] SUMMARY: %d passed, %d skipped, %d failed (of %d scenarios)\n", + #scenarios - failures - skips, + skips, + failures, + #scenarios +)) + +if failures > 0 then + os.exit(1) +end diff --git a/watchdog/tests/e2e_log.lua b/watchdog/tests/e2e_log.lua new file mode 100644 index 0000000..1e3525a --- /dev/null +++ b/watchdog/tests/e2e_log.lua @@ -0,0 +1,45 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local e2e_log = {} + +function e2e_log.banner(title) + io.write("\n[watchdog-e2e] ═══════════════════════════════════════════════════════\n") + io.write("[watchdog-e2e] SCENARIO: " .. tostring(title) .. "\n") + io.write("[watchdog-e2e] ═══════════════════════════════════════════════════════\n") + io.flush() +end + +function e2e_log.step(scenario, index, total, message) + io.write(string.format( + "[watchdog-e2e] [%s] step %02d/%02d: %s\n", + tostring(scenario), + index, + total, + tostring(message) + )) + io.flush() +end + +function e2e_log.info(message) + io.write("[watchdog-e2e] INFO: " .. tostring(message) .. "\n") + io.flush() +end + +function e2e_log.skip(scenario, reason) + io.write(string.format("[watchdog-e2e] [%s] SKIP: %s\n", tostring(scenario), tostring(reason))) + io.flush() +end + +function e2e_log.pass(scenario, detail) + local suffix = detail and (": " .. tostring(detail)) or "" + io.write(string.format("[watchdog-e2e] [%s] PASS%s\n", tostring(scenario), suffix)) + io.flush() +end + +function e2e_log.fail(scenario, err) + io.write(string.format("[watchdog-e2e] [%s] FAIL: %s\n", tostring(scenario), tostring(err))) + io.flush() +end + +return e2e_log diff --git a/watchdog/tests/fixtures/input_added_evm_advance.lua b/watchdog/tests/fixtures/input_added_evm_advance.lua new file mode 100644 index 0000000..da542eb --- /dev/null +++ b/watchdog/tests/fixtures/input_added_evm_advance.lua @@ -0,0 +1,30 @@ +-- Static fixture for an InputBox `InputAdded(address,uint256,bytes)` log whose +-- `bytes input` field contains an EvmAdvance calldata envelope. + +return { + log = { + blockNumber = "0x63", + transactionIndex = "0x0", + logIndex = "0x0", + data = "0x" + .. "0000000000000000000000000000000000000000000000000000000000000020" + .. "0000000000000000000000000000000000000000000000000000000000000144" + .. "1234567800000000000000000000000000000000000000000000000000000000" + .. "00007a6900000000000000000000000011111111111111111111111111111111" + .. "1111111100000000000000000000000022222222222222222222222222222222" + .. "2222222200000000000000000000000000000000000000000000000000000000" + .. "0000006300000000000000000000000000000000000000000000000000000000" + .. "000004d200000000000000000000000000000000000000000000000000000000" + .. "0000000000000000000000000000000000000000000000000000000000000000" + .. "0000000300000000000000000000000000000000000000000000000000000000" + .. "0000010000000000000000000000000000000000000000000000000000000000" + .. "0000000400aabbcc000000000000000000000000000000000000000000000000" + .. "0000000000000000000000000000000000000000000000000000000000000000", + }, + expected = { + app_contract = "0x1111111111111111111111111111111111111111", + msg_sender = "0x2222222222222222222222222222222222222222", + block_number = 99, + payload_hex = "00aabbcc", + }, +} diff --git a/watchdog/tests/run.lua b/watchdog/tests/run.lua new file mode 100644 index 0000000..3df26aa --- /dev/null +++ b/watchdog/tests/run.lua @@ -0,0 +1,557 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local abi = require("watchdog.abi") +local alarm = require("watchdog.alarm") +local checkpoint = require("watchdog.checkpoint") +local compare = require("watchdog.compare") +local config = require("watchdog.config") +local jsonrpc = require("watchdog.jsonrpc") +local l1 = require("watchdog.l1") +local machine_cli = require("watchdog.machine_cli") +local retry = require("watchdog.retry") +local runner = require("watchdog.runner") +local sequencer = require("watchdog.sequencer") + +local tests = {} + +local function test(name, fn) + table.insert(tests, { name = name, fn = fn }) +end + +local function assert_eq(actual, expected) + if actual ~= expected then + error(string.format("expected %q, got %q", tostring(expected), tostring(actual)), 2) + end +end + +test("raw compare fails byte-different JSON", function() + local ok, offset = compare.raw_equal('{"a":1}', '{ "a": 1 }') + assert_eq(ok, false) + assert(offset ~= nil, "expected mismatch offset") +end) + +test("decodes InputAdded log EvmAdvance envelope", function() + local fixture = dofile("watchdog/tests/fixtures/input_added_evm_advance.lua") + local decoded = abi.decode_input_added_log(fixture.log) + assert_eq(decoded.app_contract, fixture.expected.app_contract) + assert_eq(decoded.msg_sender, fixture.expected.msg_sender) + assert_eq(decoded.block_number, fixture.expected.block_number) + assert_eq(abi.hex_from_bytes(decoded.payload), fixture.expected.payload_hex) + assert(decoded.raw_input ~= nil and #decoded.raw_input > 0, "fixture keeps raw input bytes") +end) + +test("sorts logs in L1 order", function() + local logs = { + { blockNumber = "0x2", transactionIndex = "0x0", logIndex = "0x5" }, + { blockNumber = "0x1", transactionIndex = "0x9", logIndex = "0x0" }, + { blockNumber = "0x2", transactionIndex = "0x0", logIndex = "0x1" }, + } + l1.sort_logs(logs) + assert_eq(logs[1].blockNumber, "0x1") + assert_eq(logs[2].logIndex, "0x1") + assert_eq(logs[3].logIndex, "0x5") +end) + +test("partitions long block range errors", function() + local calls = {} + local rpc = {} + function rpc.get_logs(_self, filter) + table.insert(calls, { filter.from_block, filter.to_block, filter.input_added_topic }) + if filter.from_block == 1 and filter.to_block == 4 then + return nil, "RPC error -32005: query returned more than allowed" + end + return {} + end + + local logs, err = l1.fetch_logs_partitioned(rpc, { + start_block = 1, + end_block = 4, + input_box_address = "0xinputbox", + app_address = "0x1111111111111111111111111111111111111111", + }) + + assert(logs, err) + assert_eq(#calls, 3) + assert_eq(calls[2][1], 1) + assert_eq(calls[2][2], 2) + assert_eq(calls[3][1], 3) + assert_eq(calls[3][2], 4) + assert_eq(calls[3][3], l1.INPUT_ADDED_TOPIC) +end) + +test("jsonrpc get_logs builds InputAdded app filter", function() + local captured = nil + local json = {} + function json.encode(value) + captured = value + return "encoded" + end + function json.decode(_body) + return { jsonrpc = "2.0", id = 1, result = {} } + end + + local http = {} + function http.post(_self, url, body, headers) + assert_eq(url, "http://rpc") + assert_eq(body, "encoded") + assert_eq(headers["content-type"], "application/json") + return { status = 200, body = "{}" } + end + + local client = jsonrpc.new(http, json, "http://rpc") + local logs, err = client:get_logs({ + address = "0x9999999999999999999999999999999999999999", + app_address = "0x1111111111111111111111111111111111111111", + from_block = 10, + to_block = 12, + input_added_topic = l1.INPUT_ADDED_TOPIC, + }) + + assert(logs, err) + assert(type(captured) == "table", "json request captured") + local request = captured + assert_eq(request.method, "eth_getLogs") + local filter = request.params[1] + assert_eq(filter.fromBlock, "0xa") + assert_eq(filter.toBlock, "0xc") + assert_eq(filter.address, "0x9999999999999999999999999999999999999999") + assert_eq(filter.topics[1], l1.INPUT_ADDED_TOPIC) + assert_eq( + filter.topics[2], + "0x0000000000000000000000001111111111111111111111111111111111111111" + ) +end) + +test("config loads snapshot directory safe block and optional topic", function() + local env = { + WATCHDOG_L1_RPC_URL = "http://rpc", + WATCHDOG_INPUTBOX_ADDRESS = "0x9999999999999999999999999999999999999999", + WATCHDOG_APP_ADDRESS = "0x1111111111111111111111111111111111111111", + WATCHDOG_INPUT_ADDED_TOPIC = "0xtopic", + WATCHDOG_CHECKPOINT_DIR = "/tmp/checkpoints", + WATCHDOG_CM_SNAPSHOT_DIR = "/tmp/snapshot", + WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK = "42", + } + + local cfg = config.load(env) + + assert_eq(cfg.input_added_topic, "0xtopic") + assert_eq(cfg.cm_snapshot_dir, "/tmp/snapshot") + assert_eq(cfg.cm_snapshot_safe_block, 42) + assert_eq(cfg.mode, "advance") +end) + +test("config rejects unknown mode", function() + local ok, err = pcall(function() + config.load({ + WATCHDOG_MODE = "bad", + WATCHDOG_L1_RPC_URL = "http://rpc", + WATCHDOG_INPUTBOX_ADDRESS = "0x9999999999999999999999999999999999999999", + WATCHDOG_APP_ADDRESS = "0x1111111111111111111111111111111111111111", + WATCHDOG_CHECKPOINT_DIR = "/tmp/checkpoints", + }) + end) + assert_eq(ok, false) + assert(tostring(err):find("WATCHDOG_MODE", 1, true) ~= nil, "mode error is explicit") +end) + +test("checkpoint writes manifest-backed current pointer", function() + local dir = os.tmpname() + os.remove(dir) + os.execute(string.format('mkdir -p "%s"', dir)) + + local written, err = checkpoint.write(dir, 12, function(snapshot_dir) + os.execute(string.format('mkdir -p "%s"', snapshot_dir)) + local file = io.open(snapshot_dir .. "/marker", "wb") + assert(file ~= nil, "marker file opened") + file:write("snapshot") + file:close() + return true + end, { + created_at = "2026-04-28T00:00:00Z", + }) + assert(written, err) + + local loaded, load_err = checkpoint.load(dir) + assert(loaded, load_err) + assert_eq(loaded.snapshot_dir, dir .. "/checkpoints/00000000000000000012/snapshot") + assert(loaded.manifest_json:find('"safe_block":12', 1, true) ~= nil, "manifest has safe block") +end) + +test("checkpoint rejects manifest without safe block", function() + local safe_block, err = checkpoint.safe_block_from_manifest("{}") + assert_eq(safe_block, nil) + assert_eq(err, "manifest missing safe_block") +end) + +local function fake_cfg() + return { + checkpoint_dir = "/tmp/watchdog-test", + cm_snapshot_dir = "/tmp/genesis-snapshot", + cm_snapshot_safe_block = 0, + input_box_address = "0xinputbox", + app_address = "0x1111111111111111111111111111111111111111", + input_added_topic = "0xtopic", + long_block_range_error_codes = l1.DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES, + } +end + +local function fake_machine(inspect_state) + local machine = { + loaded_path = nil, + fed_inputs = nil, + } + function machine:load(path) + self.loaded_path = path + return { path = path } + end + function machine:feed_inputs(_instance, inputs) + self.fed_inputs = inputs + return true + end + function machine.inspect_state(_self, _instance) + return inspect_state + end + function machine:save(_instance, snapshot_dir) + self.saved_snapshot_dir = snapshot_dir + return true + end + return machine +end + +test("runner happy path replays inputs and writes checkpoint", function() + local checkpoint_writes = {} + local checkpoint_mod = { + load = function(_dir) + return { + snapshot_dir = "/tmp/checkpoints/0001/snapshot", + safe_block = 10, + } + end, + write = function(dir, safe_block, snapshot_writer, manifest) + local ok, err = snapshot_writer("/tmp/new-snapshot") + assert(ok, err) + table.insert(checkpoint_writes, { + dir = dir, + safe_block = safe_block, + manifest = manifest, + }) + return true + end, + } + local machine = fake_machine('{"ok":true}') + local result, err = runner.run_once(fake_cfg(), { + checkpoint = checkpoint_mod, + sequencer = { + get_state = function() + return { + safe_block = 12, + state = '{"ok":true}', + } + end, + }, + fetch_inputs = function(from_block, to_block) + assert_eq(from_block, 11) + assert_eq(to_block, 12) + return { { payload = "a" }, { payload = "b" } } + end, + machine = machine, + }) + + assert(result, err) + assert_eq(result.safe_block, 12) + assert_eq(result.input_count, 2) + assert_eq(machine.loaded_path, "/tmp/checkpoints/0001/snapshot") + assert_eq(machine.saved_snapshot_dir, "/tmp/new-snapshot") + assert_eq(#machine.fed_inputs, 2) + assert_eq(#checkpoint_writes, 1) + assert_eq(checkpoint_writes[1].safe_block, 12) +end) + +test("runner alarms on raw state mismatch", function() + local alarms = {} + local result, err = runner.run_once(fake_cfg(), { + checkpoint = { + load = function(_dir) + return { snapshot_dir = "/tmp/snapshot", safe_block = 1 } + end, + }, + sequencer = { + get_state = function() + return { + safe_block = 1, + state = '{"a":1}', + } + end, + }, + fetch_inputs = function() + return {} + end, + machine = fake_machine('{ "a": 1 }'), + alarm = function(payload) + table.insert(alarms, payload) + return true + end, + }) + + assert_eq(result, nil) + assert(type(err) == "table", "expected mismatch payload") + assert_eq(err.kind, "state_mismatch") + assert_eq(#alarms, 1) + assert_eq(alarms[1].kind, "state_mismatch") +end) + +test("runner alarms on sequencer safe block regression", function() + local alarms = {} + local result, err = runner.run_once(fake_cfg(), { + checkpoint = { + load = function(_dir) + return { snapshot_dir = "/tmp/snapshot", safe_block = 5 } + end, + }, + sequencer = { + get_state = function() + return { + safe_block = 4, + state = "{}", + } + end, + }, + machine = fake_machine("{}"), + alarm = function(payload) + table.insert(alarms, payload) + return true + end, + }) + + assert_eq(result, nil) + assert(type(err) == "table", "expected regression payload") + assert_eq(err.kind, "safe_block_regressed") + assert_eq(#alarms, 1) +end) + +test("sequencer client validates generic state response", function() + local http = {} + function http.get(_self, url) + assert_eq(url, "http://sequencer/get_state") + return { + status = 200, + body = "body", + } + end + local json = {} + function json.decode(_body) + return { + safe_block = 7, + state = "raw-state", + } + end + + local client = sequencer.new(http, json, "http://sequencer/") + local state, err = client:get_state() + assert(state, err) + assert_eq(state.safe_block, 7) + assert_eq(state.state, "raw-state") +end) + +test("sequencer client rejects invalid JSON", function() + local http = {} + function http.get(_self, _url) + return { + status = 200, + body = "not-json", + } + end + local json = {} + function json.decode(_body) + error("decode failed") + end + + local client = sequencer.new(http, json, "http://sequencer") + local state, err = client:get_state() + assert_eq(state, nil) + assert_eq(err, "invalid get_state response JSON") +end) + +test("advance runner fetches inputs and saves checkpoint without sequencer", function() + local checkpoint_writes = {} + local machine = fake_machine("unused") + local result, err = runner.advance_checkpoint_once(fake_cfg(), { + checkpoint = { + load = function(_dir) + return { snapshot_dir = "/tmp/snapshot", safe_block = 7 } + end, + write = function(dir, safe_block, snapshot_writer, manifest) + local ok, write_err = snapshot_writer("/tmp/advanced-snapshot") + assert(ok, write_err) + table.insert(checkpoint_writes, { dir = dir, safe_block = safe_block, manifest = manifest }) + return true + end, + }, + safe_block = function() + return 9 + end, + fetch_inputs = function(from_block, to_block) + assert_eq(from_block, 8) + assert_eq(to_block, 9) + return { { raw_input = "one" }, { raw_input = "two" } } + end, + machine = machine, + }) + + assert(result, err) + assert_eq(result.safe_block, 9) + assert_eq(result.input_count, 2) + assert_eq(machine.saved_snapshot_dir, "/tmp/advanced-snapshot") + assert_eq(#checkpoint_writes, 1) +end) + +test("machine cli adapter writes raw input files", function() + local base = os.tmpname() + os.remove(base) + os.execute(string.format('mkdir -p "%s"', base)) + local driver = machine_cli.new({ work_dir = base, executable = "cartesi-machine" }) + local instance = assert(driver:load("/tmp/source-snapshot")) + assert(driver:feed_inputs(instance, { + { raw_input = "abc" }, + { raw_input = "def" }, + })) + + local file = io.open(instance.input_dir .. "/input-0.bin", "rb") + assert(file ~= nil, "first input file exists") + assert_eq(file:read("*a"), "abc") + file:close() + file = io.open(instance.input_dir .. "/input-1.bin", "rb") + assert(file ~= nil, "second input file exists") + assert_eq(file:read("*a"), "def") + file:close() +end) + +test("machine cli adapter inspect reads report file", function() + local base = os.tmpname() + os.remove(base) + os.execute(string.format('mkdir -p "%s"', base)) + local script_path = base .. "/fake-cartesi-machine.sh" + local driver = machine_cli.new({ work_dir = base, executable = script_path }) + local instance = assert(driver:load("/tmp/source-snapshot")) + + local script = io.open(script_path, "wb") + assert(script ~= nil, "fake cartesi-machine script opened") + script:write(string.format([[ +#!/bin/sh +mkdir -p "%s" +printf '%%s' '{"ok":true}' > "%s/inspect-report-0.bin" +exit 0 +]], instance.work_dir, instance.work_dir)) + script:close() + os.execute(string.format('chmod +x "%s"', script_path)) + + local state, err = driver:inspect_state(instance) + + assert_eq(err, nil) + assert_eq(state, '{"ok":true}') +end) + +test("machine cli adapter leaves snapshot directory creation to cartesi-machine", function() + local base = os.tmpname() + os.remove(base) + os.execute(string.format('mkdir -p "%s"', base)) + local driver = machine_cli.new({ work_dir = base, executable = "true" }) + local instance = assert(driver:load("/tmp/source-snapshot")) + local snapshot_dir = base .. "/snapshot" + + assert(driver:save(instance, snapshot_dir)) + local exists = os.rename(snapshot_dir, snapshot_dir) + assert(not exists, "adapter must not pre-create --store target") +end) + +test("retry succeeds after transient failures", function() + local attempts = 0 + local sleeps = 0 + local result, err = retry.with_retries(function() + attempts = attempts + 1 + if attempts < 3 then + return nil, "transient" + end + return "ok" + end, { + attempts = 3, + delay_sec = 1, + sleep = function(seconds) + assert_eq(seconds, 1) + sleeps = sleeps + 1 + end, + }) + + assert_eq(result, "ok") + assert_eq(err, nil) + assert_eq(attempts, 3) + assert_eq(sleeps, 2) +end) + +test("retry returns final error after exhaustion", function() + local attempts = 0 + local result, err = retry.with_retries(function() + attempts = attempts + 1 + return nil, "failed-" .. tostring(attempts) + end, { + attempts = 2, + delay_sec = 0, + sleep = function() end, + }) + + assert_eq(result, nil) + assert_eq(err, "failed-2") + assert_eq(attempts, 2) +end) + +test("alarm webhook posts JSON payload", function() + local sent = {} + local http = {} + function http.post(_self, url, body, headers) + sent.url = url + sent.body = body + sent.headers = headers + return { status = 204, body = "" } + end + + local ok, err = alarm.send_webhook(http, "http://alarm", { + kind = "state_mismatch", + safe_block = 12, + }) + + assert(ok, err) + assert_eq(sent.url, "http://alarm") + assert_eq(sent.headers["content-type"], "application/json") + assert(sent.body:find('"kind":"state_mismatch"', 1, true) ~= nil, "body includes kind") + assert(sent.body:find('"safe_block":12', 1, true) ~= nil, "body includes safe block") +end) + +test("alarm webhook reports non-success status", function() + local http = {} + function http.post(_self, _url, _body, _headers) + return { status = 500, body = "" } + end + + local ok, err = alarm.send_webhook(http, "http://alarm", { kind = "test" }) + assert_eq(ok, nil) + assert_eq(err, "alarm webhook HTTP 500") +end) + +local failures = 0 +for _, t in ipairs(tests) do + local ok, err = pcall(t.fn) + if ok then + io.write("ok - " .. t.name .. "\n") + else + failures = failures + 1 + io.write("not ok - " .. t.name .. ": " .. tostring(err) .. "\n") + end +end + +if failures > 0 then + os.exit(1) +end diff --git a/watchdog/tests/run_compare_once.lua b/watchdog/tests/run_compare_once.lua new file mode 100644 index 0000000..d3ee518 --- /dev/null +++ b/watchdog/tests/run_compare_once.lua @@ -0,0 +1,68 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) +-- +-- Single compare-mode pass for harness-driven E2E. Expects WATCHDOG_* env vars +-- (see docs/watchdog/staging-drills.md). Exits 0 on match, 1 on failure. + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local deps_lua = os.getenv("WATCHDOG_LUA_DEPS") +if deps_lua and deps_lua ~= "" then + package.cpath = deps_lua .. "/?.so;" .. package.cpath +end + +local config = require("watchdog.config") +local http_mod = require("watchdog.http") +local jsonrpc = require("watchdog.jsonrpc") +local machine_cli = require("watchdog.machine_cli") +local runner = require("watchdog.runner") +local sequencer_mod = require("watchdog.sequencer") + +local ok_json, cjson = pcall(require, "cjson") +if not ok_json then + io.stderr:write("lua-cjson is required\n") + os.exit(1) +end + +local function getenv_table() + return setmetatable({}, { + __index = function(_, key) + return os.getenv(key) + end, + }) +end + +local cfg = config.load(getenv_table()) +if cfg.mode ~= "compare" then + io.stderr:write("WATCHDOG_MODE must be compare\n") + os.exit(1) +end + +local http = http_mod.new_auto() +local deps = { + http = http, + rpc = jsonrpc.new(http, cjson, cfg.l1_rpc_url), + sequencer = sequencer_mod.new(http, cjson, cfg.sequencer_url), + machine = machine_cli.new({ + executable = cfg.cm_executable, + work_dir = cfg.cm_work_dir, + }), +} + +local result, err = runner.run_once(cfg, deps) +if not result then + io.stderr:write("watchdog compare failed: " .. tostring(err) .. "\n") + if type(err) == "table" then + for key, value in pairs(err) do + io.stderr:write(string.format(" %s: %s\n", tostring(key), tostring(value))) + end + end + os.exit(1) +end + +print(string.format( + "watchdog compare ok: safe_block=%s input_count=%s", + tostring(result.safe_block), + tostring(result.input_count) +)) +os.exit(0)