Skip to content

Conversation

@fordN
Copy link
Contributor

@fordN fordN commented Oct 14, 2025

This PR adds parallel streaming support enabling fast historical data backfills with automatic transition to live streaming.

What's New

Parallel Execution for Historical Data

  • Partition large block ranges across multiple workers using ThreadPoolExecutor
  • 4-8x speedup for historical loads (scales with worker count)
  • Block-based partitioning with automatic or manual partition sizing

Hybrid Mode (Parallel Catchup → Live Streaming)

  • Auto-detect current max block and load historical data in parallel
  • Seamlessly transition to single-stream continuous mode for live blocks
  • Configurable reorg buffer (default: 200 blocks) for safe transition overlap

Usage

Basic parallel historical load:

parallel_config = ParallelConfig(
    num_workers=4,
    table_name='eth_firehose.blocks',
    min_block=0,
    max_block=1_000_000,
    block_column='block_num'
)

results = client.sql(query).load(
    connection='postgres',
    destination='blocks',
    stream=True,
    parallel_config=parallel_config
)

Hybrid mode (parallel → continuous):

parallel_config = ParallelConfig(
    num_workers=4,
    table_name='eth_firehose.blocks',
    min_block=0,
    max_block=None,  # Auto-detect and transition to streaming
    reorg_buffer=200  # Configurable overlap for reorg safety
)

Key Features

  • Configurable partition sizes and reorg buffer
  • Comprehensive logging and statistics tracking
  • Rich metadata in LoadResult for monitoring
  • Thread-safe statistics aggregation
  • Graceful error handling (workers fail independently)
  • Full documentation with usage patterns and troubleshooting
  • Create table earlier in flow, so all parallel workers can be sure it's already there before starting

Testing

  • 28 unit tests (partitioning logic, query transformation, stats tracking)
  • 4 integration tests (parallel load, hybrid mode, block detection)

Documentation

  • User guide: docs/parallel_streaming_usage.md - comprehensive usage patterns, configuration options, performance characteristics, and troubleshooting
  • Examples: Quick start, hybrid mode, checkpointing, custom partitioning

@fordN fordN self-assigned this Oct 14, 2025
@fordN fordN changed the base branch from main to ford/streaming October 14, 2025 15:26
@fordN fordN force-pushed the ford/parallel-streams branch from 79d2b77 to 7b549f6 Compare October 14, 2025 15:35
@fordN fordN force-pushed the ford/parallel-streams branch from 7b549f6 to 7f7d0df Compare October 22, 2025 15:09
Base automatically changed from ford/streaming to main October 22, 2025 17:30
@fordN fordN force-pushed the ford/parallel-streams branch from 7f7d0df to a2c4142 Compare October 22, 2025 21:39
@fordN fordN force-pushed the ford/parallel-streams branch from a2c4142 to 028d16f Compare October 22, 2025 21:46
fordN added 8 commits October 23, 2025 18:02
- Also move LoadResult and LoadConfig to /loaders/types to avoid
circular dependencies
- Integration tests require an Amp server
- Configurable reorg buffer
- Create table ahead of spinning up parallel workers to ensure it's ready for all of them and avoid complexity of thread locking
- SQL variables for string replacement
- Better docs, including limitations
@fordN fordN force-pushed the ford/parallel-streams branch from 028d16f to 5878a19 Compare October 24, 2025 01:03
@fordN fordN requested a review from incrypto32 October 24, 2025 01:05
@fordN fordN merged commit 68e1d60 into main Nov 4, 2025
7 checks passed
@fordN fordN deleted the ford/parallel-streams branch November 4, 2025 18:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants