Skip to content

Conversation

@fordN
Copy link
Contributor

@fordN fordN commented Nov 3, 2025

I apologize ahead of time for the size of this PR 😬 . I'm now adding a detailed description here with tips for reviewing to hopefully make it more digestible.

Overview

This PR adds streaming infrastructure features to amp-python for comprehensive state management, resilience features, and a significantly enhanced Snowflake loader. The changes enable resumable, fault-tolerant streaming with automatic gap detection, reorg handling, and label enrichment.


Review Strategy

I recommend reviewing in this order:

1. Core Concepts (20 min)

Start here to understand the foundation:

  • src/amp/streaming/state.py - Unified state management
  • src/amp/streaming/types.py - Core types: BlockRange, ResponseBatch, BatchIdentifier
  • docs/resilience.md - Architecture overview

2. Key Features (30 min)

Review the main capabilities:

  • src/amp/loaders/base.py (lines 200-400) - Streaming support in base loader
  • src/amp/streaming/parallel.py (lines 350-450) - Gap detection & resume optimization
  • src/amp/config/label_manager.py - CSV label enrichment
  • src/amp/streaming/resilience.py - Retry, backpressure, rate limiting

3. Snowflake Implementation (20 min)

The largest single file change:

  • src/amp/loaders/implementations/snowflake_loader.py - Persistent state, COPY INTO, parallel loading

4. Tests & Validation (15 min)

Verify comprehensive coverage:

  • tests/unit/test_stream_state.py - State management tests
  • tests/integration/test_checkpoint_resume.py - End-to-end resume scenarios
  • tests/integration/test_resilient_streaming.py - Resilience with real databases

5. Other stuffs (10 min)

  • Apps, docs, Docker configs, benchmarks

Key Features

1. Unified Stream State Management

Why: Simplifies resumability + idempotency into one system
Files: src/amp/streaming/state.py, sql/snowflake_stream_state.sql

  • StreamStateStore interface with in-memory, null, and DB-backed implementations
  • BatchIdentifier with hash-based uniqueness for reorg detection
  • Automatic gap detection and intelligent backfill
  • Support for multiple networks and tables

Example:

state_store = InMemoryStreamStateStore()
state_store.mark_processed(conn, table, batch_ids)
resume_pos = state_store.get_resume_position(conn, table, detect_gaps=True)

2. Label Enrichment System

Why: Join blockchain data with metadata (token info, labels, etc.)
Files: src/amp/config/label_manager.py, docs/label_manager.md

  • Load CSV labels with automatic hex→binary conversion
  • Efficient PyArrow-based joins during streaming
  • Thread-safe label storage and retrieval

Example:

client.add_label('tokens', 'eth_mainnet_token_metadata.csv')
# Automatically joins on matching address columns

3. Resilience Features

Why: Production workloads need fault tolerance
Files: src/amp/streaming/resilience.py, docs/resilience.md

  • Exponential backoff with jitter (configurable max retries)
  • Adaptive rate limiting (responds to 429s and timeouts)
  • Backpressure detection and circuit breakers
  • Transient vs permanent error classification

4. Parallel Execution Enhancements

Why: Fill historical gaps while maintaining streaming
Files: src/amp/streaming/parallel.py

  • Intelligent gap detection and prioritization
  • Resume optimization (adjusts min_block based on progress)
  • Gap-aware partitioning strategies
  • Hybrid mode: parallel backfill + streaming updates

Example:

config = ParallelConfig(
    table_name='erc20_transfers',
    min_block=10_000_000,
    max_block=21_000_000,
    num_workers=4
)
executor.load_with_resume_optimization(config, detect_gaps=True)

5. Enhanced Snowflake Loader

Why: Enterprise-grade streaming to Snowflake
Files: src/amp/loaders/implementations/snowflake_loader.py

  • Persistent state via Snowflake tables (survives restarts)
  • Parquet COPY INTO for high-performance bulk loading (via stages)
  • Parallel workers with proper connection pooling
  • Reorg handling options delete reorged data (legacy behavior) or keep history and mark as reorged
  • Reorg handling with batch-level invalidation

Testing summary

Coverage

  • 172 unit tests - Pure logic, minimal mocking
  • 133 integration tests - Real database interactions via testcontainers
  • All loaders updated: PostgreSQL, Redis, Snowflake, Delta, Iceberg, LMDB

Key Test Files

  • tests/unit/test_stream_state.py - State management
  • tests/unit/test_resilience.py - Retry, backpressure, rate limiting
  • tests/integration/test_checkpoint_resume.py - Resume scenarios
  • tests/integration/test_resilient_streaming.py - Fault injection

Run Tests

# Install all dependencies (if haven't yet)
uv build
uv sync --all-groups

# Unit tests (fast)
make test-unit

# Integration tests (require Docker or similar such as Colima)
make test-integration

# All tests
make test-all

Architecture Decisions

Why Unified State Management?

Previous design (replaced by this) had separate systems for checkpoints (resume) and processed ranges (idempotency). This created:

  • Duplication (two systems tracking similar data)
  • Complexity (had to manage both)
  • Inconsistency (could get out of sync)

Solution: StreamStateStore provides both capabilities with a single source of truth.

Why Hash-based Batch IDs?

Block ranges alone aren't unique during reorgs (same range, different chain state). Hash-based IDs:

  • Include block hashes for uniqueness across reorgs
  • Enable precise invalidation (delete only affected batches)
  • Compact (16-char hex = 64 bits)

New Files

Core Infrastructure

  • src/amp/streaming/state.py - Unified state management
  • src/amp/streaming/resilience.py - Retry, backpressure, rate limiting
  • src/amp/config/label_manager.py - CSV label enrichment
  • sql/snowflake_stream_state.sql - Snowflake state schema

Applications

  • apps/snowflake_parallel_loader.py - Production-ready parallel loader
  • apps/test_erc20_labeled_parallel.py - Example with label enrichment
  • apps/queries/erc20_transfers.sql - Sample query

Documentation

  • docs/resilience.md - Complete resilience architecture guide
  • docs/label_manager.md - Label enrichment guide
  • apps/SNOWFLAKE_LOADER_GUIDE.md - Snowflake loader tutorial
  • apps/queries/README.md - Query examples

Infrastructure

  • Dockerfile, Dockerfile.snowflake - Containerized deployment
  • k8s/deployment.yaml - Kubernetes deployment
  • .github/workflows/docker-publish.yml - Docker CI/CD

Key Commit Groups

Foundation (Commits 1-3)

  • Label management system
  • Unified state management
  • Resilience features

Core Streaming (Commits 4-6)

  • Base loader improvements
  • Parallel execution enhancements
  • Client integration

Loader Updates (Commit 7)

  • All loaders updated for new interface
  • Adds _amp_batch_id metadata column
  • Reorg handling support

Snowflake (Commits 8-9)

  • Enhanced Snowflake loader with persistent state
  • Parallel loading applications

Testing (Commits 10-11)

  • 172 unit tests
  • 133 integration tests

Polish (Commits 12-18)

  • Documentation
  • Docker/K8s configs
  • Performance benchmarks
  • Linting fixes
  • Bug fix: Redis reorg with string data structure

Performance

See performance_benchmarks.json for detailed metrics.

Highlights:

  • Snowflake COPY INTO: 10-100x faster than INSERT
  • Parallel loading: Linear speedup with workers (tested up to 8 workers)
  • Label joins: ~19ms overhead for 3K rows (negligible)
  • Resume optimization: Automatic gap detection (no manual intervention)

@fordN fordN changed the base branch from main to ford/parallel-streams November 3, 2025 17:56
@fordN fordN force-pushed the ford/snowpipe-and-streaming-state branch from 06ee399 to 764f127 Compare November 3, 2025 19:05
Base automatically changed from ford/parallel-streams to main November 4, 2025 18:11
- Load labels from CSV files with automatic type detection
- Support hex string to binary conversion for Ethereum addresses
- Thread-safe label storage and retrieval
- Add LabelJoinConfig type for configuring joins
@fordN fordN force-pushed the ford/snowpipe-and-streaming-state branch 2 times, most recently from d08ce89 to 54d0026 Compare November 4, 2025 19:29
@fordN fordN closed this Nov 5, 2025
@fordN fordN reopened this Nov 5, 2025
@fordN fordN force-pushed the ford/snowpipe-and-streaming-state branch from 82ebd6b to 21db92c Compare November 5, 2025 23:53
@fordN fordN self-assigned this Nov 6, 2025
@fordN fordN added the enhancement New feature or request label Nov 6, 2025
fordN added 11 commits November 7, 2025 12:39
- StreamStateStore interface with in-memory, null, and DB-backed
implementations
- Block range tracking with gap detection
- Reorg invalidation support

Key features:
- Resume from last processed position after crashes
- Exactly-once semantics via batch deduplication
- Gap detection and intelligent backfill
- Support for multiple networks and tables
- Exponential backoff with jitter for transient failures
- Adaptive rate limiting with automatic adjustment
- Back pressure detection and mitigation
- Error classification (transient vs permanent)
- Configurable retry policies

Features:
- Auto-detects rate limits and slows down requests
- Detects timeouts and adjusts batch sizes
- Production-tested configurations included
- Integrate state management for resume and deduplication
- Add label joining support with automatic type conversion
- Implement resilience features (retry, backpressure, rate limiting)
- Add metadata columns (_amp_batch_id) for reorg handling
- Support streaming with block ranges and reorg detection
- Separate _try_load_batch() for better error handling
- Add resume optimization that adjusts min_block based on persistent
state
- Implement gap-aware partitioning for intelligent backfill
- Add pre-flight table creation to avoid locking issues
- Improve error handling and logging for state operations
- Support label joining in parallel workers

Key features:
- Auto-detects processed ranges and skips already-loaded partitions
- Prioritizes gap filling before processing new data
- Efficient partition creation avoiding redundant work
- Visible logging for resume operations and adjustments

Resume workflow:
1. Query state store for max processed block
2. Adjust min_block to skip processed ranges
3. Detect gaps in processed data
4. Create partitions prioritizing gaps first
5. Process remaining historical data
Add label management to Client class:
- Initialize LabelManager with configurable label directory
- Support loading labels from CSV files
- Pass label_manager to all loader instances
- Enable label joining in streaming queries via load() method

Updates:
- Client now supports label enrichment out of the box
- Loaders inherit label_manager from client
- Add pyarrow.csv dependency for label loading
- PostgreSQL: Add reorg support with DELETE/UPDATE, metadata columns
- Redis: Add streaming metadata and batch ID support
- DeltaLake: Support new metadata columns
- Iceberg: Update for base class changes
- LMDB: Add metadata column support

All loaders now support:
- State-backed resume and deduplication
- Label joining via base class
- Resilience features (retry, backpressure)
- Reorg-aware streaming with metadata tracking
Add unit tests for all new streaming features:
- test_label_joining.py - Label enrichment with type conversion
- test_label_manager.py - CSV loading and label storage
- test_resilience.py - Retry, backoff, rate limiting
- test_resume_optimization.py - Resume position calculation
- test_stream_state.py - State store implementations
- test_streaming_helpers.py - Utility functions and batch ID generation
- test_streaming_types.py - BlockRange, ResumeWatermark types
- Add Snowflake-backed persistent state store (amp_stream_state table)
- Implement SnowflakeStreamStateStore with overlap detection
- Support multiple loading methods: stage, insert, pandas,
snowpipe_streaming
- Add connection pooling for parallel workers
- Implement reorg history tracking with simplified schema
- Support Parquet stage loading for better performance

State management features:
- Block-level overlap detection for different partition sizes
- MERGE-based upsert to prevent duplicate state entries
- Resume position calculation with gap detection
- Deduplication across runs

Performance improvements:
- Parallel stage loading with connection pool
- Optimized Parquet format for stage loads
- Efficient batch processing with metadata columns
Add comprehensive demo applications for Snowflake loading:

1. snowflake_parallel_loader.py - Full-featured parallel loader
   - Configurable block ranges, workers, and partition sizes
   - Label joining with CSV files
   - State management with resume capability
   - Support for all Snowflake loading methods
   - Reorg history tracking
   - Clean formatted output with progress indicators

2. test_erc20_parallel_load.py - Simple ERC20 transfer loader
   - Basic parallel loading example
   - Good starting point for new users

3. test_erc20_labeled_parallel.py - Label-enriched example
   - Demonstrates label joining with token metadata
   - Shows how to enrich blockchain data

4. Query templates in apps/queries/
   - erc20_transfers.sql - Decode ERC20 Transfer events
   - README.md - Query documentation
New tests:
- test_resilient_streaming.py - Resilience with real databases
- Enhanced Snowflake loader tests with state management
- Enhanced PostgreSQL tests with reorg handling
- Updated Redis, DeltaLake, Iceberg, LMDB loader tests

Integration test features:
- Real database containers (PostgreSQL, Redis, Snowflake)
- State persistence and resume testing
- Label joining with actual data
- Reorg detection and invalidation
- Parallel loading with multiple workers
- Error injection and recovery

Tests require Docker for database containers.
Add containerization and orchestration support:
- General-purpose Dockerfile for amp-python
- Snowflake-specific Dockerfile with parallel loader
- GitHub Actions workflow for automated Docker publishing to ghcr.io
- Kubernetes deployment manifest for GKE with resource limits
- Comprehensive .dockerignore and .gitignore

Docker images:
- amp-python: Base image with all loaders
- amp-snowflake: Optimized for Snowflake parallel loading
  - Includes snowflake_parallel_loader.py as entrypoint
  - Pre-configured with Snowflake connector and dependencies
@fordN fordN force-pushed the ford/snowpipe-and-streaming-state branch from 21db92c to 68bbdb3 Compare November 7, 2025 21:32
fordN added 3 commits November 7, 2025 14:35
- All loading methods comparison (stage, insert, pandas, streaming)
- State management and resume capability
- Label joining for data enrichment
- Performance tuning and optimization
- Parallel loading configuration
- Reorg handling strategies
- Troubleshooting common issues
fordN added 3 commits November 7, 2025 14:36
Users should now mount label CSV files at runtime using volume mounts
(Docker) or init containers with cloud storage (Kubernetes).

Changes
- Removed COPY data/ line from both Dockerfiles
- The /data directory is still created (mkdir -p /app /data) but empty
- Updated .gitignore to ignore entire data/ directory
- Removed data/** trigger from docker-publish workflow
- Added comprehensive docs/label_manager.md with:
  * Docker volume mount examples
  * Kubernetes init container pattern (recommended for large files)
  * ConfigMap examples (for small files <1MB)
  * PersistentVolume examples (for shared access)
  * Performance considerations and troubleshooting
When data_structure='string', batch IDs are stored inside JSON values
rather than as hash fields. The reorg handler now checks the data
structure and uses GET+JSON parse for strings, HGET for hashes.
@fordN fordN force-pushed the ford/snowpipe-and-streaming-state branch from 68bbdb3 to e0e5765 Compare November 7, 2025 22:36
@fordN fordN merged commit 7c66375 into main Nov 12, 2025
9 checks passed
@fordN fordN deleted the ford/snowpipe-and-streaming-state branch November 12, 2025 18:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants