Skip to content

Conversation

@fordN
Copy link
Contributor

@fordN fordN commented Nov 5, 2025

Summary

Adds foundational streaming infrastructure for the amp Python client, but does not yet integrate it.

  • Label Manager: Support joining Amp query results with data from a CSV. Designed to be used for enriching Amp data with label or category data while loading it into an external data store.
  • Stream State Management: Support resumabiity and deduplication
  • Resilience Features: Retries, backpressure, rate limiting

Changes

  • Add LabelManager for loading and managing CSV label datasets
  • Add StreamStateStore interface with in-memory and null implementations
  • Add Snowflake SQL schema for persistent state table (amp_stream_state)
  • Add exponential backoff with jitter for transient failures
  • Add adaptive rate limiting with automatic adjustment

Key Features

  • Efficient label joins using PyArrow join
  • Automatic hex address → binary conversion (50% memory reduction)
  • Resume from last processed position after interruptions
  • Exactly-once semantics via batch deduplication
  • Gap detection for intelligent backfill
  • Production-tested retry and rate limiting configurations

Part 1 of 5 in the streaming state management feature set.

The next PR (2/5) will integrate these features into the DataLoader base class.

fordN added 3 commits November 4, 2025 10:56
- Load labels from CSV files with automatic type detection
- Support hex string to binary conversion for Ethereum addresses
- Thread-safe label storage and retrieval
- Add LabelJoinConfig type for configuring joins
- StreamStateStore interface with in-memory, null, and DB-backed
implementations
- Checkpoint management for resume after interruptions
- Idempotency tracking to prevent duplicate processing
- Block range tracking with gap detection
- Reorg invalidation support

Key features:
- Resume from last processed position after crashes
- Exactly-once semantics via batch deduplication
- Gap detection and intelligent backfill
- Support for multiple networks and tables
- Exponential backoff with jitter for transient failures
- Adaptive rate limiting with automatic adjustment
- Back pressure detection and mitigation
- Error classification (transient vs permanent)
- Configurable retry policies

Features:
- Auto-detects rate limits and slows down requests
- Detects timeouts and adjusts batch sizes
- Production-tested configurations included
@fordN fordN self-assigned this Nov 5, 2025
@fordN fordN added the enhancement New feature or request label Nov 5, 2025
@fordN fordN closed this Nov 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants