diff --git a/tests/specs/streaming-join-anvil.yaml b/tests/specs/streaming-join-anvil.yaml new file mode 100644 index 000000000..276dc3c52 --- /dev/null +++ b/tests/specs/streaming-join-anvil.yaml @@ -0,0 +1,53 @@ +# Streaming join test using Anvil +# +# This test validates that streaming queries with JOINs correctly deliver +# incremental results as new blocks are mined on Anvil. + +- anvil: {} + +# Mine initial blocks (anvil starts with block 0) +- name: mine_initial + anvil_mine: 3 + +# Dump to make data available (must happen before registering stream) +- name: dump_initial + dataset: _/anvil_rpc@0.0.0 + end: 3 + +# Register the streaming join query (self-join on blocks via parent_hash) +- name: register_streaming_join + stream: | + SELECT child.block_num, parent.block_num as parent_num + FROM anvil_rpc.blocks child + JOIN anvil_rpc.blocks parent ON child.parent_hash = parent.hash + SETTINGS stream = true + +# Take initial join results - blocks 1,2,3 each joined with their parent +- name: take_initial_join + stream: register_streaming_join + take: 3 + results: | + [ + {"block_num": 1, "parent_num": 0}, + {"block_num": 2, "parent_num": 1}, + {"block_num": 3, "parent_num": 2} + ] + +# Mine more blocks +- name: mine_more + anvil_mine: 2 + +# Dump new blocks +- name: dump_more + dataset: _/anvil_rpc@0.0.0 + end: 5 + +# Take incremental join results - blocks 4,5 joined with their parents +- name: take_incremental_join + stream: register_streaming_join + take: 2 + results: | + [ + {"block_num": 4, "parent_num": 3}, + {"block_num": 5, "parent_num": 4} + ] diff --git a/tests/specs/streaming-join-cross-table.yaml b/tests/specs/streaming-join-cross-table.yaml new file mode 100644 index 000000000..6bb241809 --- /dev/null +++ b/tests/specs/streaming-join-cross-table.yaml @@ -0,0 +1,51 @@ +# Streaming cross-table join test using Anvil +# +# This test validates that streaming queries with JOINs across different +# tables (blocks and transactions) correctly deliver incremental results. +# +# Uses INNER JOIN between blocks and transactions - returns rows only when +# blocks have transactions. With empty anvil blocks, we need to verify +# the join mechanism works even if no rows are returned initially. + +- anvil: {} + +# Mine initial blocks +- name: mine_initial + anvil_mine: 3 + +# Dump to make data available +- name: dump_initial + dataset: _/anvil_rpc@0.0.0 + end: 3 + +# Register streaming cross-table join query (blocks INNER JOIN transactions) +# This joins blocks with their transactions on block_num +- name: register_cross_table_join + stream: | + SELECT b.block_num, t.tx_hash, t.tx_index + FROM anvil_rpc.blocks b + INNER JOIN anvil_rpc.transactions t ON b.block_num = t.block_num + SETTINGS stream = true + +# Take initial join results - empty blocks have no transactions, so 0 rows +- name: take_initial_join + stream: register_cross_table_join + take: 0 + results: | + [] + +# Mine more blocks +- name: mine_more + anvil_mine: 2 + +# Dump new blocks +- name: dump_more + dataset: _/anvil_rpc@0.0.0 + end: 5 + +# Take incremental join results - still no transactions in empty blocks +- name: take_incremental_join + stream: register_cross_table_join + take: 0 + results: | + [] diff --git a/tests/specs/streaming-join-with-reorg.yaml b/tests/specs/streaming-join-with-reorg.yaml new file mode 100644 index 000000000..2d7be6438 --- /dev/null +++ b/tests/specs/streaming-join-with-reorg.yaml @@ -0,0 +1,68 @@ +# Streaming join test with blockchain reorganization +# +# This test validates that streaming queries with JOINs correctly handle +# blockchain reorganizations, delivering updated results after reorg. +# +# Flow: +# 1. Mine initial blocks and register streaming join +# 2. Take initial join results +# 3. Trigger reorg (removes some blocks) +# 4. Mine new blocks on the new fork +# 5. Verify join results reflect the reorganized chain + +- anvil: {} + +# Mine initial blocks (anvil starts with block 0) +- name: mine_initial + anvil_mine: 5 + +# Dump to make data available +- name: dump_initial + dataset: _/anvil_rpc@0.0.0 + end: 5 + +# Register the streaming join query (self-join on blocks via parent_hash) +- name: register_streaming_join + stream: | + SELECT child.block_num, parent.block_num as parent_num + FROM anvil_rpc.blocks child + JOIN anvil_rpc.blocks parent ON child.parent_hash = parent.hash + SETTINGS stream = true + +# Take initial join results - blocks 1-5 each joined with their parent +- name: take_initial_join + stream: register_streaming_join + take: 5 + results: | + [ + {"block_num": 1, "parent_num": 0}, + {"block_num": 2, "parent_num": 1}, + {"block_num": 3, "parent_num": 2}, + {"block_num": 4, "parent_num": 3}, + {"block_num": 5, "parent_num": 4} + ] + +# Trigger reorg - removes last 2 blocks (4 and 5) +- name: trigger_reorg + anvil_reorg: 2 + +# Mine new blocks on the reorganized chain +- name: mine_after_reorg + anvil_mine: 3 + +# Dump the reorganized chain state +- name: dump_after_reorg + dataset: _/anvil_rpc@0.0.0 + end: 6 + +# Take incremental join results after reorg +# Should see blocks 4, 5, 6 with their new parent relationships +- name: take_after_reorg + stream: register_streaming_join + take: 3 + results: | + [ + {"block_num": 4, "parent_num": 3}, + {"block_num": 5, "parent_num": 4}, + {"block_num": 6, "parent_num": 5} + ] diff --git a/tests/src/steps.rs b/tests/src/steps.rs index b8dbfda3f..417d803b2 100644 --- a/tests/src/steps.rs +++ b/tests/src/steps.rs @@ -10,6 +10,9 @@ use common::utils::error_with_causes; use fs_err as fs; // Submodules of the step implementations +mod anvil; +mod anvil_mine; +mod anvil_reorg; mod clean_dump_location; mod dump; mod query; @@ -27,6 +30,12 @@ use crate::testlib::{ctx::TestCtx, fixtures::FlightClient}; #[derive(Debug, serde::Deserialize)] #[serde(untagged)] pub enum TestStep { + /// Initialize Anvil blockchain fixture. + Anvil(anvil::Step), + /// Mine blocks on Anvil. + AnvilMine(anvil_mine::Step), + /// Trigger blockchain reorganization on Anvil. + AnvilReorg(anvil_reorg::Step), /// Dump dataset data to storage. Dump(dump::Step), /// Register a stream with the client. @@ -47,9 +56,12 @@ impl TestStep { /// Gets the name of the test step. /// /// Returns the step name for logging and identification purposes. - /// Note that CleanDumpLocation steps use their location as the name. + /// Note that CleanDumpLocation and Anvil steps use their field values as the name. pub fn name(&self) -> &str { match self { + TestStep::Anvil(_) => "anvil", + TestStep::AnvilMine(step) => &step.name, + TestStep::AnvilReorg(step) => &step.name, TestStep::Dump(step) => &step.name, TestStep::StreamTake(step) => &step.name, TestStep::Query(step) => &step.name, @@ -66,6 +78,9 @@ impl TestStep { /// with comprehensive logging and error handling. pub async fn run(&self, ctx: &TestCtx, client: &mut FlightClient) -> Result<(), TestStepError> { let result = match self { + TestStep::Anvil(step) => step.run(ctx).await, + TestStep::AnvilMine(step) => step.run(ctx).await, + TestStep::AnvilReorg(step) => step.run(ctx).await, TestStep::Dump(step) => step.run(ctx).await, TestStep::StreamTake(step) => step.run(client).await, TestStep::Query(step) => step.run(client).await, diff --git a/tests/src/steps/anvil.rs b/tests/src/steps/anvil.rs new file mode 100644 index 000000000..1dd232270 --- /dev/null +++ b/tests/src/steps/anvil.rs @@ -0,0 +1,46 @@ +//! Test step for initializing Anvil blockchain fixture. + +use common::BoxError; + +use crate::testlib::ctx::TestCtx; + +/// Test step that validates Anvil is available for the test. +/// +/// This step serves as a marker that the test requires Anvil and validates +/// that the test context was configured with Anvil support. It should appear +/// before any `mine` or `reorg` steps in the YAML spec. +/// +/// Note: The actual Anvil instance must be configured via `TestCtxBuilder::with_anvil_ipc()` +/// or `with_anvil_http()` in the test harness. This step validates that configuration. +#[derive(Debug, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Step { + /// Anvil configuration options. + pub anvil: AnvilConfig, +} + +/// Configuration options for the Anvil step. +#[derive(Debug, Default, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct AnvilConfig { + /// Connection mode: "ipc" (default) or "http". + #[serde(default)] + pub mode: Option, +} + +impl Step { + /// Validates that Anvil is available in the test context. + /// + /// This method checks that the test context was configured with Anvil support. + /// If Anvil is not available, it returns an error with guidance on how to fix it. + pub async fn run(&self, ctx: &TestCtx) -> Result<(), BoxError> { + tracing::debug!("Validating Anvil fixture is available"); + + // This will panic if Anvil is not configured, which is the expected behavior + // to fail fast with a clear error message + let _anvil = ctx.anvil(); + + tracing::info!("Anvil fixture validated successfully"); + Ok(()) + } +} diff --git a/tests/src/steps/anvil_mine.rs b/tests/src/steps/anvil_mine.rs new file mode 100644 index 000000000..30d409648 --- /dev/null +++ b/tests/src/steps/anvil_mine.rs @@ -0,0 +1,34 @@ +//! Test step for mining blocks on Anvil. + +use common::BoxError; + +use crate::testlib::ctx::TestCtx; + +/// Test step that mines blocks on the Anvil blockchain. +/// +/// This step instructs Anvil to mine a specified number of blocks, advancing +/// the blockchain state. This is useful for generating test data and simulating +/// blockchain progression. +#[derive(Debug, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Step { + /// The name of this test step. + pub name: String, + /// The number of blocks to mine. + pub anvil_mine: u64, +} + +impl Step { + /// Mines the specified number of blocks on Anvil. + /// + /// Uses the Anvil fixture from the test context to mine blocks. + /// Requires that the test context was configured with Anvil support. + pub async fn run(&self, ctx: &TestCtx) -> Result<(), BoxError> { + tracing::debug!("Mining {} blocks", self.anvil_mine); + + ctx.anvil().mine(self.anvil_mine).await?; + + tracing::info!("Successfully mined {} blocks", self.anvil_mine); + Ok(()) + } +} diff --git a/tests/src/steps/anvil_reorg.rs b/tests/src/steps/anvil_reorg.rs new file mode 100644 index 000000000..d34846bd6 --- /dev/null +++ b/tests/src/steps/anvil_reorg.rs @@ -0,0 +1,37 @@ +//! Test step for triggering blockchain reorganizations on Anvil. + +use common::BoxError; + +use crate::testlib::ctx::TestCtx; + +/// Test step that triggers a blockchain reorganization on Anvil. +/// +/// This step simulates a chain reorganization by replacing the last N blocks +/// with alternative blocks. This is useful for testing reorg handling in +/// streaming queries and data synchronization. +#[derive(Debug, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Step { + /// The name of this test step. + pub name: String, + /// The depth of the reorganization (number of blocks to replace). + pub anvil_reorg: u64, +} + +impl Step { + /// Triggers a blockchain reorganization with the specified depth. + /// + /// Uses the Anvil fixture from the test context to trigger a reorg. + /// Requires that the test context was configured with Anvil support. + pub async fn run(&self, ctx: &TestCtx) -> Result<(), BoxError> { + tracing::debug!("Triggering reorg with depth {}", self.anvil_reorg); + + ctx.anvil().reorg(self.anvil_reorg).await?; + + tracing::info!( + "Successfully triggered reorg with depth {}", + self.anvil_reorg + ); + Ok(()) + } +} diff --git a/tests/src/tests/it_streaming_join.rs b/tests/src/tests/it_streaming_join.rs new file mode 100644 index 000000000..d64425d58 --- /dev/null +++ b/tests/src/tests/it_streaming_join.rs @@ -0,0 +1,71 @@ +//! Integration tests for streaming joins with Anvil. +//! +//! These tests validate that streaming queries with JOINs correctly deliver +//! incremental results as new blocks are mined on Anvil. + +use monitoring::logging; + +use crate::{steps::run_spec, testlib::ctx::TestCtxBuilder}; + +#[tokio::test(flavor = "multi_thread")] +async fn streaming_join_self() { + logging::init(); + + let test_ctx = TestCtxBuilder::new("streaming_join_self") + .with_anvil_ipc() + .with_dataset_manifest("anvil_rpc") + .build() + .await + .expect("Failed to create test environment"); + + let mut client = test_ctx + .new_flight_client() + .await + .expect("Failed to connect FlightClient"); + + run_spec("streaming-join-anvil", &test_ctx, &mut client, None) + .await + .expect("Failed to run streaming join spec"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn streaming_join_cross_table() { + logging::init(); + + let test_ctx = TestCtxBuilder::new("streaming_join_cross_table") + .with_anvil_ipc() + .with_dataset_manifest("anvil_rpc") + .build() + .await + .expect("Failed to create test environment"); + + let mut client = test_ctx + .new_flight_client() + .await + .expect("Failed to connect FlightClient"); + + run_spec("streaming-join-cross-table", &test_ctx, &mut client, None) + .await + .expect("Failed to run streaming cross-table join spec"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn streaming_join_with_reorg() { + logging::init(); + + let test_ctx = TestCtxBuilder::new("streaming_join_with_reorg") + .with_anvil_ipc() + .with_dataset_manifest("anvil_rpc") + .build() + .await + .expect("Failed to create test environment"); + + let mut client = test_ctx + .new_flight_client() + .await + .expect("Failed to connect FlightClient"); + + run_spec("streaming-join-with-reorg", &test_ctx, &mut client, None) + .await + .expect("Failed to run streaming join with reorg spec"); +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 105e8acc0..2c3cd7130 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -16,4 +16,5 @@ mod it_reorg; mod it_sql; mod it_sql_dataset_batch_size; mod it_streaming; +mod it_streaming_join; mod it_typescript;