Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions tests/specs/streaming-join-anvil.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Streaming join test using Anvil
#
# This test validates that streaming queries with JOINs correctly deliver
# incremental results as new blocks are mined on Anvil.

- anvil: {}

# Mine initial blocks (anvil starts with block 0)
- name: mine_initial
mine: 3

# Dump to make data available (must happen before registering stream)
- name: dump_initial
dataset: _/anvil_rpc@0.0.0
end: 3

# Register the streaming join query (self-join on blocks via parent_hash)
- name: register_streaming_join
stream: |
SELECT child.block_num, parent.block_num as parent_num
FROM anvil_rpc.blocks child
JOIN anvil_rpc.blocks parent ON child.parent_hash = parent.hash
SETTINGS stream = true

# Take initial join results - blocks 1,2,3 each joined with their parent
- name: take_initial_join
stream: register_streaming_join
take: 3
results: |
[
{"block_num": 1, "parent_num": 0},
{"block_num": 2, "parent_num": 1},
{"block_num": 3, "parent_num": 2}
]

# Mine more blocks
- name: mine_more
mine: 2

# Dump new blocks
- name: dump_more
dataset: _/anvil_rpc@0.0.0
end: 5

# Take incremental join results - blocks 4,5 joined with their parents
- name: take_incremental_join
stream: register_streaming_join
take: 2
results: |
[
{"block_num": 4, "parent_num": 3},
{"block_num": 5, "parent_num": 4}
]
51 changes: 51 additions & 0 deletions tests/specs/streaming-join-cross-table.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Streaming cross-table join test using Anvil
#
# This test validates that streaming queries with JOINs across different
# tables (blocks and transactions) correctly deliver incremental results.
#
# Uses INNER JOIN between blocks and transactions - returns rows only when
# blocks have transactions. With empty anvil blocks, we need to verify
# the join mechanism works even if no rows are returned initially.

- anvil: {}

# Mine initial blocks
- name: mine_initial
mine: 3

# Dump to make data available
- name: dump_initial
dataset: _/anvil_rpc@0.0.0
end: 3

# Register streaming cross-table join query (blocks INNER JOIN transactions)
# This joins blocks with their transactions on block_num
- name: register_cross_table_join
stream: |
SELECT b.block_num, t.tx_hash, t.tx_index
FROM anvil_rpc.blocks b
INNER JOIN anvil_rpc.transactions t ON b.block_num = t.block_num
SETTINGS stream = true

# Take initial join results - empty blocks have no transactions, so 0 rows
- name: take_initial_join
stream: register_cross_table_join
take: 0
results: |
[]

# Mine more blocks
- name: mine_more
mine: 2

# Dump new blocks
- name: dump_more
dataset: _/anvil_rpc@0.0.0
end: 5

# Take incremental join results - still no transactions in empty blocks
- name: take_incremental_join
stream: register_cross_table_join
take: 0
results: |
[]
68 changes: 68 additions & 0 deletions tests/specs/streaming-join-with-reorg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Streaming join test with blockchain reorganization
#
# This test validates that streaming queries with JOINs correctly handle
# blockchain reorganizations, delivering updated results after reorg.
#
# Flow:
# 1. Mine initial blocks and register streaming join
# 2. Take initial join results
# 3. Trigger reorg (removes some blocks)
# 4. Mine new blocks on the new fork
# 5. Verify join results reflect the reorganized chain

- anvil: {}

# Mine initial blocks (anvil starts with block 0)
- name: mine_initial
mine: 5

# Dump to make data available
- name: dump_initial
dataset: _/anvil_rpc@0.0.0
end: 5

# Register the streaming join query (self-join on blocks via parent_hash)
- name: register_streaming_join
stream: |
SELECT child.block_num, parent.block_num as parent_num
FROM anvil_rpc.blocks child
JOIN anvil_rpc.blocks parent ON child.parent_hash = parent.hash
SETTINGS stream = true

# Take initial join results - blocks 1-5 each joined with their parent
- name: take_initial_join
stream: register_streaming_join
take: 5
results: |
[
{"block_num": 1, "parent_num": 0},
{"block_num": 2, "parent_num": 1},
{"block_num": 3, "parent_num": 2},
{"block_num": 4, "parent_num": 3},
{"block_num": 5, "parent_num": 4}
]

# Trigger reorg - removes last 2 blocks (4 and 5)
- name: trigger_reorg
reorg: 2

# Mine new blocks on the reorganized chain
- name: mine_after_reorg
mine: 3

# Dump the reorganized chain state
- name: dump_after_reorg
dataset: _/anvil_rpc@0.0.0
end: 6

# Take incremental join results after reorg
# Should see blocks 4, 5, 6 with their new parent relationships
- name: take_after_reorg
stream: register_streaming_join
take: 3
results: |
[
{"block_num": 4, "parent_num": 3},
{"block_num": 5, "parent_num": 4},
{"block_num": 6, "parent_num": 5}
]
17 changes: 16 additions & 1 deletion tests/src/steps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use common::utils::error_with_causes;
use fs_err as fs;

// Submodules of the step implementations
mod anvil;
mod clean_dump_location;
mod dump;
mod mine;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mine and reorg are anvil-dependent steps, I would suggest renaming them anvil_mine and anvil_reorg.

mod query;
mod register;
mod reorg;
mod restore;
mod stream;
mod stream_take;
Expand All @@ -27,6 +30,12 @@ use crate::testlib::{ctx::TestCtx, fixtures::FlightClient};
#[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
pub enum TestStep {
/// Initialize Anvil blockchain fixture.
Anvil(anvil::Step),
/// Mine blocks on Anvil.
Mine(mine::Step),
/// Trigger blockchain reorganization on Anvil.
Reorg(reorg::Step),
/// Dump dataset data to storage.
Dump(dump::Step),
/// Register a stream with the client.
Expand All @@ -47,9 +56,12 @@ impl TestStep {
/// Gets the name of the test step.
///
/// Returns the step name for logging and identification purposes.
/// Note that CleanDumpLocation steps use their location as the name.
/// Note that CleanDumpLocation and Anvil steps use their field values as the name.
pub fn name(&self) -> &str {
match self {
TestStep::Anvil(_) => "anvil",
TestStep::Mine(step) => &step.name,
TestStep::Reorg(step) => &step.name,
TestStep::Dump(step) => &step.name,
TestStep::StreamTake(step) => &step.name,
TestStep::Query(step) => &step.name,
Expand All @@ -66,6 +78,9 @@ impl TestStep {
/// with comprehensive logging and error handling.
pub async fn run(&self, ctx: &TestCtx, client: &mut FlightClient) -> Result<(), TestStepError> {
let result = match self {
TestStep::Anvil(step) => step.run(ctx).await,
TestStep::Mine(step) => step.run(ctx).await,
TestStep::Reorg(step) => step.run(ctx).await,
TestStep::Dump(step) => step.run(ctx).await,
TestStep::StreamTake(step) => step.run(client).await,
TestStep::Query(step) => step.run(client).await,
Expand Down
46 changes: 46 additions & 0 deletions tests/src/steps/anvil.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! Test step for initializing Anvil blockchain fixture.

use common::BoxError;

use crate::testlib::ctx::TestCtx;

/// Test step that validates Anvil is available for the test.
///
/// This step serves as a marker that the test requires Anvil and validates
/// that the test context was configured with Anvil support. It should appear
/// before any `mine` or `reorg` steps in the YAML spec.
///
/// Note: The actual Anvil instance must be configured via `TestCtxBuilder::with_anvil_ipc()`
/// or `with_anvil_http()` in the test harness. This step validates that configuration.
#[derive(Debug, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Step {
/// Anvil configuration options.
pub anvil: AnvilConfig,
}

/// Configuration options for the Anvil step.
#[derive(Debug, Default, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct AnvilConfig {
/// Connection mode: "ipc" (default) or "http".
#[serde(default)]
pub mode: Option<String>,
}

impl Step {
/// Validates that Anvil is available in the test context.
///
/// This method checks that the test context was configured with Anvil support.
/// If Anvil is not available, it returns an error with guidance on how to fix it.
pub async fn run(&self, ctx: &TestCtx) -> Result<(), BoxError> {
tracing::debug!("Validating Anvil fixture is available");

// This will panic if Anvil is not configured, which is the expected behavior
// to fail fast with a clear error message
let _anvil = ctx.anvil();

tracing::info!("Anvil fixture validated successfully");
Ok(())
}
}
34 changes: 34 additions & 0 deletions tests/src/steps/mine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Test step for mining blocks on Anvil.

use common::BoxError;

use crate::testlib::ctx::TestCtx;

/// Test step that mines blocks on the Anvil blockchain.
///
/// This step instructs Anvil to mine a specified number of blocks, advancing
/// the blockchain state. This is useful for generating test data and simulating
/// blockchain progression.
#[derive(Debug, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Step {
/// The name of this test step.
pub name: String,
/// The number of blocks to mine.
pub mine: u64,
}

impl Step {
/// Mines the specified number of blocks on Anvil.
///
/// Uses the Anvil fixture from the test context to mine blocks.
/// Requires that the test context was configured with Anvil support.
pub async fn run(&self, ctx: &TestCtx) -> Result<(), BoxError> {
tracing::debug!("Mining {} blocks", self.mine);

ctx.anvil().mine(self.mine).await?;

tracing::info!("Successfully mined {} blocks", self.mine);
Ok(())
}
}
34 changes: 34 additions & 0 deletions tests/src/steps/reorg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Test step for triggering blockchain reorganizations on Anvil.

use common::BoxError;

use crate::testlib::ctx::TestCtx;

/// Test step that triggers a blockchain reorganization on Anvil.
///
/// This step simulates a chain reorganization by replacing the last N blocks
/// with alternative blocks. This is useful for testing reorg handling in
/// streaming queries and data synchronization.
#[derive(Debug, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Step {
/// The name of this test step.
pub name: String,
/// The depth of the reorganization (number of blocks to replace).
pub reorg: u64,
}

impl Step {
/// Triggers a blockchain reorganization with the specified depth.
///
/// Uses the Anvil fixture from the test context to trigger a reorg.
/// Requires that the test context was configured with Anvil support.
pub async fn run(&self, ctx: &TestCtx) -> Result<(), BoxError> {
tracing::debug!("Triggering reorg with depth {}", self.reorg);

ctx.anvil().reorg(self.reorg).await?;

tracing::info!("Successfully triggered reorg with depth {}", self.reorg);
Ok(())
}
}
Loading