Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5bfb743
streaming: Setup primitives for parallel streaming
fordN Oct 14, 2025
bb2f3c0
loader: Wire up parallel streaming
fordN Oct 14, 2025
75a8335
docs: Parallel streaming implementation plan
fordN Oct 14, 2025
c14af50
tests: Unit and integration tests for parallel streaming
fordN Oct 14, 2025
e0e31d2
docs, README: Document parallel streams usage
fordN Oct 14, 2025
d406962
CLAUDE.md: Don't use mocks
fordN Oct 14, 2025
ae83e03
postgresql_loader: Use Numeric type for Arrow UINT64 columns
fordN Oct 14, 2025
5878a19
parallel streaming: various improvements
fordN Oct 14, 2025
b174c20
streaming: Add a checkpoint system for resumable streaming
fordN Oct 23, 2025
4bb653b
streaming: Add resilience primitives
fordN Oct 23, 2025
0fd23e7
streaming: Add idempotency system
fordN Oct 23, 2025
ff749a8
loaders: Integrate resilience, checkpointing, and idempotency
fordN Oct 23, 2025
439ed86
docs: Add resilience system documentation
fordN Oct 23, 2025
47fe18e
Formatting and linting
fordN Oct 23, 2025
948890a
Replace deprecated datetime.utcnow() with datetime.now(UTC)
fordN Oct 23, 2025
e2d3ac7
snowflake_loader: Snowpipe streaming support
fordN Oct 16, 2025
8c8a78c
snowflake_loader: Add missing threading import
fordN Oct 24, 2025
b37e584
deps: Upgrade snowflake-connector-python to 4.0.0
fordN Oct 24, 2025
fff424c
WIP: fixes I don't think we should have needed :shrug:
fordN Oct 24, 2025
75f7a7b
snowflake_loader: Support for Pandas loading
fordN Oct 17, 2025
b4b1007
WIP: snowpipe fixes/tests and label manager
fordN Oct 24, 2025
d25888c
Snowflake checkpoint store
fordN Oct 24, 2025
a121d3e
label manager tests
fordN Oct 24, 2025
18a83b8
label manager and parallel load test
fordN Oct 24, 2025
79b53b8
docker: Build image and push to package registry
fordN Oct 24, 2025
7f99c52
data: Eth Mainnet erc20 token metadata
fordN Oct 24, 2025
b86f23b
k8s updates
fordN Oct 24, 2025
4ebc2ca
k8s updates and add the app we want to run
fordN Oct 24, 2025
4aec6d9
Dockerfile: Install amp package
fordN Oct 24, 2025
c9b565f
Dockerfile: Update to install all group deps
fordN Oct 24, 2025
50f1b42
Dockerfile: all group deps
fordN Oct 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
name: Build and Push Docker Image

on:
push:
branches:
- main
paths:
- 'src/**'
- 'apps/**'
- 'data/**'
- 'Dockerfile'
- 'pyproject.toml'
- '.github/workflows/docker-publish.yml'
pull_request:
branches:
- main
workflow_dispatch: # Allow manual trigger
inputs:
tag:
description: 'Docker image tag (default: latest)'
required: false
default: 'latest'

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

jobs:
build-and-push:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Log in to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Extract metadata for Docker
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha,prefix=sha-
type=raw,value=latest,enable={{is_default_branch}}

- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64

- name: Image digest
run: echo ${{ steps.meta.outputs.digest }}
64 changes: 64 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Environment files
.env
.test.env
*.env

# Kubernetes secrets (NEVER commit these!)
k8s/secret.yaml
k8s/secrets.yaml

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
*.egg
*.egg-info/
dist/
build/
.eggs/

# Virtual environments
.venv/
venv/
ENV/
env/

# IDE
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store

# Testing
.pytest_cache/
.coverage
.coverage.*
htmlcov/
.tox/
*.cover
.hypothesis/

# Notebooks
.ipynb_checkpoints/

# Logs
*.log
/tmp/

# UV/pip cache
.uv/
uv.lock

# Data directories (local development)
data/*.csv
data/*.parquet
data/*.db
data/*.lmdb

# Build artifacts
*.tar.gz
*.zip
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ When implementing new loaders:
5. Follow existing patterns from PostgreSQL and Redis loaders

### Testing Strategy
- **Unit tests**: Mock external dependencies, test business logic
- **Integration tests**: Use testcontainers for real database testing
- **Unit tests**: Test pure logic and data structures WITHOUT mocking. Unit tests should be simple, fast, and test isolated components (dataclasses, utility functions, partitioning logic, etc.). Do NOT add tests that require mocking to `tests/unit/`.
- **Integration tests**: Use testcontainers for real database testing. Tests that require external dependencies (databases, Flight SQL server, etc.) belong in `tests/integration/`.
- **Performance tests**: Benchmark data loading operations
- Tests can be filtered using pytest markers (e.g., `-m unit` for unit tests only)

Expand Down
91 changes: 91 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Multi-stage build for optimized image size
# Stage 1: Build dependencies
FROM python:3.12-slim AS builder

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*

# Install UV for fast dependency management
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

# Set working directory
WORKDIR /app

# Copy dependency files
COPY pyproject.toml README.md ./

# Install dependencies using UV (much faster than pip)
# Install ALL dependencies including all loader dependencies
# This ensures optional dependencies don't cause import errors
RUN uv pip install --system --no-cache \
pandas>=2.3.1 \
pyarrow>=20.0.0 \
typer>=0.15.2 \
adbc-driver-manager>=1.5.0 \
adbc-driver-postgresql>=1.5.0 \
protobuf>=4.21.0 \
base58>=2.1.1 \
'eth-hash[pysha3]>=0.7.1' \
eth-utils>=5.2.0 \
google-cloud-bigquery>=3.30.0 \
google-cloud-storage>=3.1.0 \
arro3-core>=0.5.1 \
arro3-compute>=0.5.1 \
psycopg2-binary>=2.9.0 \
redis>=4.5.0 \
deltalake>=1.0.2 \
'pyiceberg[sql-sqlite]>=0.10.0' \
'pydantic>=2.0,<2.12' \
snowflake-connector-python>=4.0.0 \
snowpipe-streaming>=1.0.0 \
lmdb>=1.4.0

# Stage 2: Runtime image
FROM python:3.12-slim

# Install runtime dependencies only
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
&& rm -rf /var/lib/apt/lists/*

# Create non-root user for security
RUN useradd -m -u 1000 amp && \
mkdir -p /app /data && \
chown -R amp:amp /app /data

# Set working directory
WORKDIR /app

# Copy Python packages from builder
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages

# Copy UV from builder for package installation
COPY --from=builder /usr/local/bin/uv /usr/local/bin/uv

# Copy application code
COPY --chown=amp:amp src/ ./src/
COPY --chown=amp:amp apps/ ./apps/
COPY --chown=amp:amp data/ ./data/
COPY --chown=amp:amp pyproject.toml README.md ./

# Install the amp package in the system Python (NOT editable for Docker)
RUN uv pip install --system --no-cache .

# Switch to non-root user
USER amp

# Set Python path
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import sys; sys.exit(0)"

# Default command - run ERC20 loader
# Can be overridden with docker run arguments
ENTRYPOINT ["python", "apps/test_erc20_labeled_parallel.py"]
CMD ["--blocks", "100000", "--workers", "8", "--flush-interval", "0.5"]
32 changes: 19 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ test-lmdb:
@echo "⚡ Running LMDB tests..."
$(PYTHON) pytest tests/ -m "lmdb" -v --log-cli-level=ERROR

# Parallel streaming integration tests
test-parallel-streaming:
@echo "⚡ Running parallel streaming integration tests..."
$(PYTHON) pytest tests/integration/test_parallel_streaming.py -v -s --log-cli-level=INFO

# Performance tests
test-performance:
@echo "🏇 Running performance tests..."
Expand Down Expand Up @@ -109,16 +114,17 @@ clean:
# Show available commands
help:
@echo "Available commands:"
@echo " make setup - Setup development environment"
@echo " make test-unit - Run unit tests (fast)"
@echo " make test-integration - Run integration tests"
@echo " make test-all - Run all tests with coverage"
@echo " make test-postgresql - Run PostgreSQL tests"
@echo " make test-redis - Run Redis tests"
@echo " make test-snowflake - Run Snowflake tests"
@echo " make test-performance - Run performance tests"
@echo " make lint - Lint code with ruff"
@echo " make format - Format code with ruff"
@echo " make test-setup - Start test databases"
@echo " make test-cleanup - Stop test databases"
@echo " make clean - Clean test artifacts"
@echo " make setup - Setup development environment"
@echo " make test-unit - Run unit tests (fast)"
@echo " make test-integration - Run integration tests"
@echo " make test-parallel-streaming - Run parallel streaming integration tests"
@echo " make test-all - Run all tests with coverage"
@echo " make test-postgresql - Run PostgreSQL tests"
@echo " make test-redis - Run Redis tests"
@echo " make test-snowflake - Run Snowflake tests"
@echo " make test-performance - Run performance tests"
@echo " make lint - Lint code with ruff"
@echo " make format - Format code with ruff"
@echo " make test-setup - Start test databases"
@echo " make test-cleanup - Stop test databases"
@echo " make clean - Clean test artifacts"
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ and the `amp` package. For example, you can run the `execute_query` app with the
uv run apps/execute_query.py
```

## Documentation

### Features
- **[Parallel Streaming Usage Guide](docs/parallel_streaming_usage.md)** - User guide for high-throughput parallel data loading
- **[Parallel Streaming Design](docs/parallel_streaming.md)** - Technical design documentation for parallel streaming architecture
- **[Reorganization Handling](docs/reorg_handling.md)** - Guide for handling blockchain reorganizations
- **[Implementing Data Loaders](docs/implementing_data_loaders.md)** - Guide for creating custom data loaders

# Self-hosted Amp server

In order to operate a local Amp server you will need to have the files
Expand Down Expand Up @@ -133,6 +141,19 @@ make test-iceberg # Iceberg tests
make test-lmdb # LMDB tests
```

## Feature-Specific Tests

Run tests for specific features:
```bash
make test-parallel-streaming # Parallel streaming integration tests (requires Amp server)
```

**Note**: Parallel streaming tests require an Amp server. Configure using environment variables in `.test.env`:
- `AMP_SERVER_URL` - Amp server URL (e.g., `grpc://your-server:80`)
- `AMP_TEST_TABLE` - Source table name (e.g., `eth_firehose.blocks`)
- `AMP_TEST_BLOCK_COLUMN` - Block column name (default: `block_num`)
- `AMP_TEST_MAX_BLOCK` - Max block for testing (default: `1000`)

# Linting and formatting

Ruff is configured to be used for linting and formatting of this project.
Expand Down
Loading
Loading