ETL pipeline for ingesting, transforming, and serving analytics data from multiple sources.
- Language: Python 3.12
- Framework: FastAPI 0.115 (API), Celery 5.4 (workers)
- Database: PostgreSQL 16 (warehouse), Redis 7 (broker/cache)
- ORM: SQLAlchemy 2.0 with Alembic migrations
- Data: Pandas 2.2, Polars 1.x, DuckDB (analytics queries)
- Testing: pytest, pytest-asyncio, factory-boy, hypothesis
- Linting: Ruff (linter + formatter), mypy (strict mode)
- Package Manager: uv (lockfile:
uv.lock) - CI/CD: GitHub Actions, Docker, AWS ECS
- Docs: Sphinx with autodoc
uv sync- Install dependencies from lockfileuv run fastapi dev- Start API server (localhost:8000)uv run celery -A dataflow.worker worker --loglevel=info- Start Celery workeruv run pytest- Run test suiteuv run pytest --cov=dataflow --cov-report=html- Run tests with coverageuv run mypy dataflow/- Type checkuv run ruff check dataflow/- Lintuv run ruff format dataflow/- Formatuv run alembic upgrade head- Apply database migrationsuv run alembic revision --autogenerate -m "description"- Generate migrationdocker compose up -d- Start PostgreSQL, Redis, and worker locally
dataflow/
api/
routes/ - FastAPI route modules (one per resource)
deps.py - Dependency injection (db session, current user, services)
middleware.py - CORS, timing, error handling middleware
core/
config.py - Pydantic Settings with environment validation
security.py - JWT token handling, password hashing
exceptions.py - Custom exception classes with error codes
models/ - SQLAlchemy ORM models
schemas/ - Pydantic request/response schemas
repositories/ - Data access layer (one per model)
services/ - Business logic (orchestrates repositories)
workers/
tasks.py - Celery task definitions
pipelines/ - ETL pipeline definitions (extract, transform, load)
utils/ - Pure utility functions
tests/
conftest.py - Shared fixtures (db session, client, factories)
factories/ - factory-boy model factories
unit/ - Unit tests for services and utils
integration/ - Integration tests for repositories and API
alembic/
versions/ - Migration scripts
env.py - Alembic environment configuration
scripts/ - Operational scripts (backfill, cleanup, reports)
- Type hints on all function signatures. Use
from __future__ import annotations. - Use
Annotatedtypes withDepends()for FastAPI dependency injection. - Use
async deffor all API endpoints. Usedeffor CPU-bound Celery tasks. - Prefer
Polarsfor new data transformations. UsePandasonly for library compatibility. - Maximum function length: 30 lines. Extract helpers with descriptive names.
- No mutable default arguments. Use
Nonewithif arg is None: arg = [].
- Custom exceptions inherit from
DataFlowErrorbase class. - Services raise domain exceptions (
UserNotFoundError,PipelineFailedError). - API layer catches domain exceptions and maps to HTTP responses.
- Celery tasks use
autoretry_forwith exponential backoff for transient failures. - Log all exceptions with full context (task ID, user ID, input parameters).
- 85% minimum coverage. 95% on
services/andcore/. - Use
factory-boyfor test data. No raw model construction in tests. - Use
hypothesisfor property-based tests on data transformation functions. - Integration tests run against a real PostgreSQL instance (Docker in CI).
- Async tests use
pytest-asynciowithasyncio_mode = "auto".
DATABASE_URL- PostgreSQL connection string (postgresql+asyncpg://...)REDIS_URL- Redis connection for Celery broker and result backendSECRET_KEY- JWT signing key (256-bit random)CORS_ORIGINS- Comma-separated allowed originsS3_BUCKET- Data lake bucket for raw ingestion filesSENTRY_DSN- Error trackingLOG_LEVEL- Logging level (default:INFO)
| Date | Decision | Rationale |
|---|---|---|
| 2025-06-01 | uv over Poetry | Faster installs, better lockfile resolution |
| 2025-07-15 | Polars over Pandas | 10x faster for column operations, no GIL issues |
| 2025-08-01 | SQLAlchemy 2.0 | Async support, modern mapped_column syntax |
| 2025-09-10 | DuckDB for analytics | In-process OLAP queries, no separate cluster needed |
| 2025-11-01 | Ruff over Black+isort+flake8 | Single tool, faster, consistent configuration |