From f90c541df89ed6115813f55f09aa318873ab7e21 Mon Sep 17 00:00:00 2001 From: Tomas Sanchez Date: Fri, 12 Jun 2026 19:20:13 -0300 Subject: [PATCH 1/2] feat(template): record async + PostgreSQL ADRs and add database choice ADR 0017 makes async the default and supersedes ADR 0006; ADR 0018 makes PostgreSQL (asyncpg, pgvector image) the default with SQLite (aiosqlite) optional; ADR 0019 takes coverage from unit+e2e and moves the Docker integration tier to a separate CI stage. copier.yml gains a database choice (postgres default) and an async, conditional database_url. Refs #24 Co-Authored-By: Claude Fable 5 --- copier.yml | 12 +++- ...-async-is-an-explicit-end-to-end-choice.md | 3 +- .../adr/0017-async-persistence-by-default.md | 65 +++++++++++++++++++ .../0018-postgresql-default-with-pgvector.md | 60 +++++++++++++++++ .../0019-coverage-from-unit-and-e2e-tests.md | 60 +++++++++++++++++ template/docs/adr/README.md | 5 +- 6 files changed, 201 insertions(+), 4 deletions(-) create mode 100644 template/docs/adr/0017-async-persistence-by-default.md create mode 100644 template/docs/adr/0018-postgresql-default-with-pgvector.md create mode 100644 template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md diff --git a/copier.yml b/copier.yml index adcc705..b1b3cc8 100644 --- a/copier.yml +++ b/copier.yml @@ -109,10 +109,18 @@ python_version: - "3.12" default: "3.13" +database: + type: str + help: Default database engine (async drivers; SQLite stays available for quick starts) + choices: + PostgreSQL (asyncpg, pgvector image): postgres + SQLite (aiosqlite): sqlite + default: postgres + database_url: type: str - help: Default SQLAlchemy database URL - default: "sqlite+pysqlite:///./{{ project_slug }}.db" + help: Default async SQLAlchemy database URL + default: "{% if database == 'postgres' %}postgresql+asyncpg://{{ project_slug }}:{{ project_slug }}@localhost:5432/{{ project_slug }}{% else %}sqlite+aiosqlite:///./{{ project_slug }}.db{% endif %}" include_user_example: type: bool diff --git a/template/docs/adr/0006-async-is-an-explicit-end-to-end-choice.md b/template/docs/adr/0006-async-is-an-explicit-end-to-end-choice.md index e9ae15d..35249a8 100644 --- a/template/docs/adr/0006-async-is-an-explicit-end-to-end-choice.md +++ b/template/docs/adr/0006-async-is-an-explicit-end-to-end-choice.md @@ -1,6 +1,7 @@ # ADR 0006: Async Is an Explicit End-to-End Choice -- Status: Accepted +- Status: Superseded +- Superseded by: [0017](0017-async-persistence-by-default.md) - Date: 2026-05-31 ## Context diff --git a/template/docs/adr/0017-async-persistence-by-default.md b/template/docs/adr/0017-async-persistence-by-default.md new file mode 100644 index 0000000..0ced804 --- /dev/null +++ b/template/docs/adr/0017-async-persistence-by-default.md @@ -0,0 +1,65 @@ +# ADR 0017: Async Persistence and Application Code by Default + +- Status: Accepted +- Date: 2026-06-12 + +## Context + +[ADR 0006](0006-async-is-an-explicit-end-to-end-choice.md) made synchronous code +the default and treated async as an explicit, per-use-case opt-in. It also +stated that changing the default persistence mode to async requires a new ADR — +this is that ADR. + +FastAPI is an async-first framework, and the services this template scaffolds +are I/O-bound (database, HTTP, brokers). Running async end to end lets a single +worker serve many concurrent requests while awaiting I/O, and SQLAlchemy 2's +asyncio extension is now mature and well documented. Maintaining a synchronous +default plus an async opt-in also means two persistence styles to teach and +test. Standardizing on async removes that fork. + +## Decision + +The template is **async end to end by default**, and ships only the async path. + +- Persistence uses `create_async_engine`, `AsyncSession`, and + `async_sessionmaker`. +- The unit of work is an async context manager (`async with uow:`) with + `await uow.commit()` / `await uow.rollback()` and async write-back. +- Repositories and query readers are async and use + `await session.execute(select(...))`. +- Command and event handlers are `async def`; the message bus awaits them. +- FastAPI routes and the application lifespan are async; startup/shutdown await + container hooks. +- Database drivers are async: `asyncpg` for PostgreSQL, `aiosqlite` for SQLite + (`greenlet` is pulled in transitively by SQLAlchemy's async support). + +Domain objects stay plain synchronous Python — business rules must not perform +I/O ([ADR 0002](0002-domain-models-are-framework-independent.md)), so they never +become coroutines. + +This **supersedes [ADR 0006](0006-async-is-an-explicit-end-to-end-choice.md)**. + +## Consequences + +The code matches idiomatic modern FastAPI and there is a single persistence +style to learn, maintain, and test. The cost is that async correctness now +matters everywhere: handlers must never block the event loop, tests need an +async runner (`pytest-asyncio`), and contributors must understand `await` +semantics. The domain layer is unaffected and remains trivially unit-testable. + +## Agent Guidance + +- Make every adapter, service-layer, and entrypoint I/O path `async`/`await`. +- Never call blocking I/O inside an `async def`; offload to a thread only with a + documented reason. +- Keep domain methods synchronous and free of I/O. +- Use `AsyncSession` and `await` commits/queries; do not mix a sync `Session` + into the request path. +- Write async tests; cover the async paths. + +## References + +- [SQLAlchemy asyncio extension](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html) +- [FastAPI: async and await](https://fastapi.tiangolo.com/async/) +- [pytest-asyncio](https://pytest-asyncio.readthedocs.io/) +- [ADR 0006: Async Is an Explicit End-to-End Choice](0006-async-is-an-explicit-end-to-end-choice.md) diff --git a/template/docs/adr/0018-postgresql-default-with-pgvector.md b/template/docs/adr/0018-postgresql-default-with-pgvector.md new file mode 100644 index 0000000..4714f1d --- /dev/null +++ b/template/docs/adr/0018-postgresql-default-with-pgvector.md @@ -0,0 +1,60 @@ +# ADR 0018: PostgreSQL by Default with a pgvector Image; SQLite Optional + +- Status: Accepted +- Date: 2026-06-12 + +## Context + +The template historically defaulted to file-based SQLite, which is excellent for +zero-setup demos but is not what most production services run. Real services +want PostgreSQL: concurrent writers, real types (JSONB, arrays), and increasingly +vector search for embeddings via the `pgvector` extension. A template should +default to the realistic target while still offering a frictionless, +infrastructure-free option. + +## Decision + +PostgreSQL is the **default** database; SQLite remains a selectable option. + +- A Copier `database` question chooses `postgres` (default) or `sqlite`. +- **PostgreSQL**: the `asyncpg` driver and a default URL of the form + `postgresql+asyncpg://…`. `docker-compose` bundles a **pgvector-capable + image** (`pgvector/pgvector:pg17`) as a `db` service with a healthcheck and a + named volume; the `app` service depends on it. +- **SQLite**: the `aiosqlite` driver and a file URL; no database service is + added to `docker-compose`. +- Async drivers only, per [ADR 0017](0017-async-persistence-by-default.md). +- Alembic runs against the async engine. + +The pgvector **image** is provided so the extension is available, but the +example domain ships **no vector column**. Projects that need vectors add a +`CREATE EXTENSION IF NOT EXISTS vector` step in a migration and a `Vector` +column in a persistence model when the requirement is real. + +## Consequences + +New projects start on a production-shaped database and can adopt pgvector without +changing infrastructure. SQLite stays available for quick experiments and is the +substrate for the fast offline test tier +([ADR 0019](0019-coverage-from-unit-and-e2e-tests.md)). The cost is that the +Postgres default expects a running database for local runs; `docker-compose up` +provides one, and `DATABASE_AUTO_CREATE_SCHEMA`/Alembic handle the schema. + +## Agent Guidance + +- Select the driver through the database URL (`postgresql+asyncpg` / + `sqlite+aiosqlite`); do not hardcode a dialect in adapters. +- Before using a `Vector` column, add `CREATE EXTENSION IF NOT EXISTS vector` in + an Alembic migration. +- Keep persistence-model SQL portable where practical; put genuinely + Postgres-specific behavior behind an integration test + ([ADR 0019](0019-coverage-from-unit-and-e2e-tests.md)). +- Manage schema with Alembic; reserve `AUTO_CREATE_SCHEMA` for demos and tests. + +## References + +- [pgvector](https://github.com/pgvector/pgvector) +- [asyncpg](https://magicstack.github.io/asyncpg/) +- [aiosqlite](https://aiosqlite.omnilib.dev/) +- [Alembic with async engines](https://alembic.sqlalchemy.org/en/latest/cookbook.html#using-asyncio-with-alembic) +- [ADR 0017: Async Persistence and Application Code by Default](0017-async-persistence-by-default.md) diff --git a/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md b/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md new file mode 100644 index 0000000..46bd526 --- /dev/null +++ b/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md @@ -0,0 +1,60 @@ +# ADR 0019: Coverage From Unit and E2E Tests; Integration Runs Separately + +- Status: Accepted +- Date: 2026-06-12 + +## Context + +[ADR 0007](0007-tooling-and-test-pyramid.md) defines the test pyramid +(`unit`, `integration`, `e2e`). With PostgreSQL as the default +([ADR 0018](0018-postgresql-default-with-pgvector.md)) and async persistence +([ADR 0017](0017-async-persistence-by-default.md)), the integration tier needs a +real PostgreSQL instance (Docker). Gating the 100% coverage requirement on tests +that require Docker couples the fast feedback loop — and the offline template +bake — to container infrastructure, which is slow and brittle in that role. + +## Decision + +Split the suite by what each tier needs and what it gates. + +- **`tests/unit`** — pure unit tests with mocked sessions/queries and in-memory + fakes; no database. +- **`tests/e2e`** — routes exercised through the ASGI app with an injected + **in-memory async SQLite** (`aiosqlite`) container; no external infrastructure. +- **`tests/integration`** — repository, unit-of-work, dialect, and migration + behavior against a **real PostgreSQL** (`asyncpg`) via a Docker service + container. + +The **100% coverage gate is computed from `unit` + `e2e` only.** These run +offline and deterministically, so adapter code must be reachable by a mocked +unit test or the in-memory e2e path to count. `tests/integration` is **excluded +from coverage** and runs as a **separate CI stage** with a PostgreSQL (pgvector) +service. `make cover` runs the offline gate; `make integration` runs the Docker +tier. The Copier bake matrix runs only the offline gate. + +This extends [ADR 0007](0007-tooling-and-test-pyramid.md); 0007 remains Accepted. + +## Consequences + +Coverage feedback is fast, deterministic, and infrastructure-free, and the +template bakes offline. Real-PostgreSQL behavior (dialect specifics, migrations, +transaction semantics) is still verified, in a dedicated CI job that does not +block the coverage signal. The trade-off: code that only runs against PostgreSQL +must be covered by an integration test for confidence even though it does not +count toward the coverage percentage, so keep such Postgres-only branches thin. + +## Agent Guidance + +- Put fast behavior in `tests/unit` (mock the `AsyncSession`/reader) and critical + wiring in `tests/e2e` (in-memory async SQLite container). +- Mark real-database tests with the `integration` marker and place them in + `tests/integration`; never rely on them for coverage. +- Keep the offline gate at 100% via `make cover`. +- Verify dialect- or migration-specific behavior in the integration stage. + +## References + +- [pytest markers](https://docs.pytest.org/en/stable/how-to/mark.html) +- [coverage.py configuration](https://coverage.readthedocs.io/en/latest/config.html) +- [ADR 0007: Tooling and Test Pyramid](0007-tooling-and-test-pyramid.md) +- [ADR 0018: PostgreSQL by Default with a pgvector Image](0018-postgresql-default-with-pgvector.md) diff --git a/template/docs/adr/README.md b/template/docs/adr/README.md index 13a7ac4..fb422e0 100644 --- a/template/docs/adr/README.md +++ b/template/docs/adr/README.md @@ -24,7 +24,7 @@ treated as migration work, not as a precedent to repeat. | [0003](0003-fastapi-and-pydantic-live-at-boundaries.md) | FastAPI and Pydantic Live at Boundaries | Superseded | | [0004](0004-sqlalchemy-2-persistence-behind-repositories.md) | SQLAlchemy 2 Persistence Stays Behind Repositories | Accepted | | [0005](0005-explicit-composition-and-message-dispatch.md) | Explicit Composition and Message Dispatch | Accepted | -| [0006](0006-async-is-an-explicit-end-to-end-choice.md) | Async Is an Explicit End-to-End Choice | Accepted | +| [0006](0006-async-is-an-explicit-end-to-end-choice.md) | Async Is an Explicit End-to-End Choice | Superseded | | [0007](0007-tooling-and-test-pyramid.md) | Tooling and Test Pyramid | Accepted | | [0008](0008-static-typing-with-pyrefly.md) | Static Typing With Pyrefly | Accepted | | [0009](0009-conventional-commits.md) | Conventional Commits | Accepted | @@ -35,6 +35,9 @@ treated as migration work, not as a precedent to repeat. | [0014](0014-cqrs-read-models-are-purpose-built.md) | CQRS Read Models Are Purpose Built | Accepted | | [0015](0015-copier-template-engine.md) | Copier as the Template Engine | Accepted | | [0016](0016-aggregate-persistence-write-back.md) | Aggregate Persistence Write-Back on Commit | Accepted | +| [0017](0017-async-persistence-by-default.md) | Async Persistence and Application Code by Default | Accepted | +| [0018](0018-postgresql-default-with-pgvector.md) | PostgreSQL by Default with a pgvector Image; SQLite Optional | Accepted | +| [0019](0019-coverage-from-unit-and-e2e-tests.md) | Coverage From Unit and E2E Tests; Integration Runs Separately | Accepted | ## Agent Checklist From 73c0ba7bf20fc02598b165183e0fd865adce211a Mon Sep 17 00:00:00 2001 From: Tomas Sanchez Date: Fri, 12 Jun 2026 19:51:46 -0300 Subject: [PATCH 2/2] feat(template): async persistence, PostgreSQL default, separated integration tier Convert the whole persistence/service/entrypoint stack to async (create_async_engine, AsyncSession, async UoW/repositories/queries/handlers/routes, async Alembic) per ADR 0017; domain stays synchronous. Default to PostgreSQL via asyncpg with a pgvector/pgvector docker-compose service, SQLite (aiosqlite) optional, per ADR 0018. Per ADR 0019, the 100% coverage gate runs offline from unit (mocked) + e2e (in-memory async SQLite) for both database choices, while the real-Postgres integration tier is marked, skipped offline, and runs in a separate CI job. Bake matrix extended to database x include_user_example (4 combos), all green offline at 100%; integration tier verified against a real pgvector Postgres. Refs #24 Co-Authored-By: Claude Fable 5 --- copier.yml | 1 + pyproject.toml | 1 + template/.github/workflows/build.yml.jinja | 39 +++ template/Makefile.jinja | 12 +- template/docker-compose.yaml.jinja | 39 ++- .../0019-coverage-from-unit-and-e2e-tests.md | 3 + template/migrations/env.py.jinja | 30 ++- template/pyproject.toml.jinja | 18 +- .../adapters/queries.py.jinja | 12 +- .../adapters/repository.py.jinja | 21 +- .../adapters/unit_of_work.py.jinja | 25 +- template/src/{{ package_name }}/asgi.py.jinja | 4 +- .../src/{{ package_name }}/bootstrap.py.jinja | 30 ++- .../entrypoint/monitor.py.jinja | 6 +- .../entrypoint/users.py.jinja | 10 +- .../service_layer/handlers.py.jinja | 28 ++- .../service_layer/messagebus.py.jinja | 20 +- .../service_layer/queries.py.jinja | 6 +- .../service_layer/repository.py.jinja | 6 +- .../service_layer/unit_of_work.py.jinja | 10 +- template/tests/conftest.py.jinja | 42 +++- .../e2e/entrypoint/test_monitor.py.jinja | 42 ++-- .../tests/e2e/entrypoint/test_users.py.jinja | 35 ++- template/tests/integration/conftest.py.jinja | 41 +++ .../integration/test_migrations.py.jinja | 51 +++- .../integration/test_persistence.py.jinja | 172 +++---------- .../integration/test_unit_of_work.py.jinja | 119 ++++----- template/tests/unit/adapters/__init__.py | 0 .../unit/adapters/test_persistence.py.jinja | 235 ++++++++++++++++++ .../unit/adapters/test_unit_of_work.py.jinja | 127 ++++++++++ .../unit/service_layer/test_handlers.py.jinja | 50 ++-- .../service_layer/test_messagebus.py.jinja | 35 +-- template/tests/unit/test_asgi.py.jinja | 6 +- template/tests/unit/test_bootstrap.py.jinja | 14 +- tests/test_bake.py | 129 +++++++--- uv.lock | 2 + 36 files changed, 981 insertions(+), 440 deletions(-) create mode 100644 template/tests/integration/conftest.py.jinja create mode 100644 template/tests/unit/adapters/__init__.py create mode 100644 template/tests/unit/adapters/test_persistence.py.jinja create mode 100644 template/tests/unit/adapters/test_unit_of_work.py.jinja diff --git a/copier.yml b/copier.yml index b1b3cc8..d7ce7fb 100644 --- a/copier.yml +++ b/copier.yml @@ -31,6 +31,7 @@ _exclude: - "{% if not include_user_example %}**/unit/domain/models/test_user.py{% endif %}" - "{% if not include_user_example %}**/service_layer/test_handlers.py{% endif %}" - "{% if not include_user_example %}**/integration/test_persistence.py{% endif %}" + - "{% if not include_user_example %}**/unit/adapters/test_persistence.py{% endif %}" - "{% if not include_user_example %}**/e2e/entrypoint/test_users.py{% endif %}" _message_after_copy: | diff --git a/pyproject.toml b/pyproject.toml index 6054fe6..676d725 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dev = [ "copier>=9.4.0", "pytest>=8.4.0", "pre-commit>=4.2.0", + "pyyaml>=6.0", ] [tool.pytest.ini_options] diff --git a/template/.github/workflows/build.yml.jinja b/template/.github/workflows/build.yml.jinja index 3f693b6..79bc438 100644 --- a/template/.github/workflows/build.yml.jinja +++ b/template/.github/workflows/build.yml.jinja @@ -35,3 +35,42 @@ jobs: run: make cover - name: Build image run: docker build . --file Dockerfile --tag {{ project_slug }}:{% raw %}${{ github.sha }}{% endraw %} +{%- if database == 'postgres' %} + + integration: + + runs-on: ubuntu-latest + + services: + postgres: + image: pgvector/pgvector:pg17 + env: + POSTGRES_USER: {{ project_slug }} + POSTGRES_PASSWORD: {{ project_slug }} + POSTGRES_DB: {{ project_slug }} + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U {{ project_slug }} -d {{ project_slug }}" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + + env: + TEST_DATABASE_URL: postgresql+asyncpg://{{ project_slug }}:{{ project_slug }}@localhost:5432/{{ project_slug }} + + steps: + - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + - name: Install dependencies + run: make dev + - name: Apply migrations + run: uv run alembic upgrade head + env: + DATABASE_URL: {% raw %}${{ env.TEST_DATABASE_URL }}{% endraw %} + - name: Run the PostgreSQL integration tier + run: make integration +{%- endif %} diff --git a/template/Makefile.jinja b/template/Makefile.jinja index a5262e3..bc7c490 100644 --- a/template/Makefile.jinja +++ b/template/Makefile.jinja @@ -26,8 +26,12 @@ dev: ## Install runtime and dev dependencies uv sync --dev .PHONY: test -test: ## Executes tests cases - uv run pytest +test: ## Executes the offline test tiers (unit + e2e) + uv run pytest -m "not integration" + +.PHONY: integration +integration: ## Executes the PostgreSQL integration tier (requires TEST_DATABASE_URL / Docker) + uv run pytest -m integration .PHONY: adr-check adr-check: ## Validates the architecture decision registry @@ -42,8 +46,8 @@ migrate: ## Applies relational database migrations uv run alembic upgrade head .PHONY: cover -cover: ## Executes tests cases with coverage reports - uv run pytest --cov src/{{ package_name }} --cov-fail-under=100 --junitxml reports/xunit.xml \ +cover: ## Executes the offline tiers (unit + e2e) with the coverage gate + uv run pytest -m "not integration" --cov src/{{ package_name }} --cov-fail-under=100 --junitxml reports/xunit.xml \ --cov-report xml:reports/coverage.xml --cov-report term-missing .PHONY: format diff --git a/template/docker-compose.yaml.jinja b/template/docker-compose.yaml.jinja index cdfd91c..0a97d06 100644 --- a/template/docker-compose.yaml.jinja +++ b/template/docker-compose.yaml.jinja @@ -12,19 +12,56 @@ services: # reflected for Uvicorn's live-reload. volumes: - ./src:/app/src +{%- if database == 'sqlite' %} # Persist the SQLite database file across container recreation. - app-data:/app/data +{%- endif %} +{%- if database == 'postgres' %} + depends_on: + db: + condition: service_healthy +{%- endif %} environment: # Enable Uvicorn auto-reload for local development. - UVICORN_RELOAD=true +{%- if database == 'postgres' %} + # Connect to the bundled PostgreSQL service over the async driver. + - DATABASE_URL=postgresql+asyncpg://{{ project_slug }}:{{ project_slug }}@db:5432/{{ project_slug }} +{%- else %} # Store the SQLite database on the persistent named volume. - - DATABASE_URL=sqlite+pysqlite:///./data/{{ project_slug }}.db + - DATABASE_URL=sqlite+aiosqlite:///./data/{{ project_slug }}.db +{%- endif %} # Create the schema at startup so the demo works on first run. # Production deployments must use Alembic migrations (make migrate) # instead of auto-creating the schema. - DATABASE_AUTO_CREATE_SCHEMA=true # Allow the bundled frontend / local tools to call the API in development. - FASTAPI_BACKEND_CORS_ORIGINS=["http://localhost:8000"] +{%- if database == 'postgres' %} + + db: + # pgvector-capable PostgreSQL so projects can adopt vector search without + # changing infrastructure. The example domain ships no vector column yet. + image: pgvector/pgvector:pg17 + container_name: {{ project_slug }}-db + environment: + - POSTGRES_USER={{ project_slug }} + - POSTGRES_PASSWORD={{ project_slug }} + - POSTGRES_DB={{ project_slug }} + ports: + - "5432:5432" + volumes: + - db-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U {{ project_slug }} -d {{ project_slug }}"] + interval: 5s + timeout: 5s + retries: 5 +{%- endif %} volumes: +{%- if database == 'postgres' %} + db-data: +{%- else %} app-data: +{%- endif %} diff --git a/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md b/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md index 46bd526..4aa59d1 100644 --- a/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md +++ b/template/docs/adr/0019-coverage-from-unit-and-e2e-tests.md @@ -51,6 +51,9 @@ count toward the coverage percentage, so keep such Postgres-only branches thin. `tests/integration`; never rely on them for coverage. - Keep the offline gate at 100% via `make cover`. - Verify dialect- or migration-specific behavior in the integration stage. +- Keep `concurrency = ["thread", "greenlet"]` in `[tool.coverage.run]`. SQLAlchemy's + async support resumes coroutines across a greenlet boundary, so without it + coverage misclassifies branches that continue after an awaited database call. ## References diff --git a/template/migrations/env.py.jinja b/template/migrations/env.py.jinja index 00c8b79..75b0347 100644 --- a/template/migrations/env.py.jinja +++ b/template/migrations/env.py.jinja @@ -1,9 +1,11 @@ """Configure Alembic migrations.""" +import asyncio from logging.config import fileConfig from alembic import context -from sqlalchemy import engine_from_config, pool +from sqlalchemy import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config from {{ package_name }}.adapters.models.base import Base {%- if include_user_example %} @@ -35,18 +37,28 @@ def run_migrations_offline() -> None: context.run_migrations() -def run_migrations_online() -> None: - """Run migrations against a database connection.""" +def do_run_migrations(connection: Connection) -> None: + """Run migrations against an established synchronous connection. + + Alembic's migration operations are synchronous; the async engine bridges + them through ``connection.run_sync(do_run_migrations)``. + """ + context.configure(connection=connection, target_metadata=target_metadata) + with context.begin_transaction(): + context.run_migrations() + + +async def run_migrations_online() -> None: + """Run migrations against an async database connection.""" configuration = config.get_section(config.config_ini_section) or {} configuration["sqlalchemy.url"] = get_database_url() - connectable = engine_from_config(configuration, prefix="sqlalchemy.", poolclass=pool.NullPool) - with connectable.connect() as connection: - context.configure(connection=connection, target_metadata=target_metadata) - with context.begin_transaction(): - context.run_migrations() + connectable = async_engine_from_config(configuration, prefix="sqlalchemy.") + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + await connectable.dispose() if context.is_offline_mode(): run_migrations_offline() else: - run_migrations_online() + asyncio.run(run_migrations_online()) diff --git a/template/pyproject.toml.jinja b/template/pyproject.toml.jinja index 7990161..f9e8c48 100644 --- a/template/pyproject.toml.jinja +++ b/template/pyproject.toml.jinja @@ -11,19 +11,29 @@ dependencies = [ "uvicorn>=0.34.0", "pydantic-settings>=2.9.0", "pydantic[email]>=2.11.0", - "sqlalchemy>=2.0.41", + "sqlalchemy[asyncio]>=2.0.41", "alembic>=1.16.2", +{%- if database == 'postgres' %} + "asyncpg>=0.30.0", +{%- endif %} +{%- if database == 'sqlite' %} + "aiosqlite>=0.20.0", +{%- endif %} ] [dependency-groups] dev = [ "pytest>=8.4.0", + "pytest-asyncio>=0.24.0", "coverage[toml]>=7.9.0", "pre-commit>=4.2.0", "httpx>=0.28.0", "pytest-cov>=6.2.0", "ruff>=0.12,<0.13", "pyrefly>=1.0.0", + # The offline test tier always uses in-memory async SQLite, regardless of + # the runtime database, so aiosqlite is a dev dependency even for Postgres. + "aiosqlite>=0.20.0", ] [build-system] @@ -82,6 +92,10 @@ known-first-party = ["{{ package_name }}"] [tool.coverage.run] branch = true +# SQLAlchemy's async engine drives I/O through greenlet, and the ASGI app runs +# routes in a thread pool. Track both so coverage records branches that resume +# after an awaited database call (e.g. error-handling paths). +concurrency = ["thread", "greenlet"] omit = ['tests/*', 'src/{{ package_name }}/asgi.py', 'src/{{ package_name }}/main.py', 'src/**/__init__.py'] [tool.coverage.report] @@ -97,6 +111,8 @@ python_files = "test_*.py" junit_family = "xunit1" log_cli = false log_level = "DEBUG" +asyncio_mode = "auto" +markers = ["integration: requires a real PostgreSQL database (Docker)"] [tool.pyrefly] project-includes = ["src", "scripts", "tests", "migrations"] diff --git a/template/src/{{ package_name }}/adapters/queries.py.jinja b/template/src/{{ package_name }}/adapters/queries.py.jinja index 64e9346..4b62e96 100644 --- a/template/src/{{ package_name }}/adapters/queries.py.jinja +++ b/template/src/{{ package_name }}/adapters/queries.py.jinja @@ -2,7 +2,7 @@ from uuid import UUID -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from {{ package_name }}.adapters.models.user import UserRecord from {{ package_name }}.domain.models.user import UserSettings @@ -12,18 +12,18 @@ from {{ package_name }}.service_layer.read_models import UserReadModel class SqlAlchemyUserReader: """Read user projections without rehydrating write-side aggregates.""" - def __init__(self, session_factory: sessionmaker[Session]): + def __init__(self, session_factory: async_sessionmaker[AsyncSession]): """Initialize the reader. Args: - session_factory: Factory used to create SQLAlchemy sessions. + session_factory: Factory used to create async SQLAlchemy sessions. """ self.session_factory = session_factory - def get(self, user_id: UUID) -> UserReadModel | None: + async def get(self, user_id: UUID) -> UserReadModel | None: """Return a user projection by identity.""" - with self.session_factory() as session: - record = session.get(UserRecord, str(user_id)) + async with self.session_factory() as session: + record = await session.get(UserRecord, str(user_id)) if record is None: return None return UserReadModel( diff --git a/template/src/{{ package_name }}/adapters/repository.py.jinja b/template/src/{{ package_name }}/adapters/repository.py.jinja index 71712d7..dc3b35c 100644 --- a/template/src/{{ package_name }}/adapters/repository.py.jinja +++ b/template/src/{{ package_name }}/adapters/repository.py.jinja @@ -6,7 +6,7 @@ from datetime import UTC from uuid import UUID from sqlalchemy import select -from sqlalchemy.orm import Session +from sqlalchemy.ext.asyncio import AsyncSession from {{ package_name }}.adapters.models.user import UserRecord from {{ package_name }}.domain.models.user import User, UserSettings @@ -15,11 +15,11 @@ from {{ package_name }}.domain.models.user import User, UserSettings class SqlAlchemyUserRepository: """Persist user aggregates with SQLAlchemy.""" - def __init__(self, session: Session): + def __init__(self, session: AsyncSession): """Initialize the repository. Args: - session: SQLAlchemy session owned by the unit of work. + session: Async SQLAlchemy session owned by the unit of work. """ self.session = session self.seen: dict[UUID, User] = {} @@ -29,7 +29,7 @@ class SqlAlchemyUserRepository: self.session.add(self._to_record(user)) self.seen[user.id] = user - def get(self, user_id: UUID) -> User | None: + async def get(self, user_id: UUID) -> User | None: """Return a user by identity. Returns the already-tracked instance when the row was loaded earlier @@ -37,12 +37,12 @@ class SqlAlchemyUserRepository: """ if user_id in self.seen: return self.seen[user_id] - return self._remember(self.session.get(UserRecord, str(user_id))) + return self._remember(await self.session.get(UserRecord, str(user_id))) - def get_by_email(self, email: str) -> User | None: + async def get_by_email(self, email: str) -> User | None: """Return a user by normalized email address.""" - record = self.session.scalar(select(UserRecord).where(UserRecord.email == email.strip().lower())) - return self._remember(record) + result = await self.session.execute(select(UserRecord).where(UserRecord.email == email.strip().lower())) + return self._remember(result.scalar_one_or_none()) def _remember(self, record: UserRecord | None) -> User | None: """Translate and track a loaded aggregate in the identity map.""" @@ -54,7 +54,7 @@ class SqlAlchemyUserRepository: self.seen[user.id] = user return user - def persist_changes(self) -> None: + async def persist_changes(self) -> None: """Write the current state of tracked aggregates back to the session. The translation pattern detaches domain aggregates from SQLAlchemy @@ -64,7 +64,8 @@ class SqlAlchemyUserRepository: updates (aggregates loaded then mutated). """ for user in self.seen.values(): - self.session.merge(self._to_record(user)) + await self.session.merge(self._to_record(user)) + await self.session.commit() @staticmethod def _to_record(user: User) -> UserRecord: diff --git a/template/src/{{ package_name }}/adapters/unit_of_work.py.jinja b/template/src/{{ package_name }}/adapters/unit_of_work.py.jinja index bde4de9..b84b672 100644 --- a/template/src/{{ package_name }}/adapters/unit_of_work.py.jinja +++ b/template/src/{{ package_name }}/adapters/unit_of_work.py.jinja @@ -5,7 +5,7 @@ from __future__ import annotations from types import TracebackType from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker {% if include_user_example -%} from {{ package_name }}.adapters.repository import SqlAlchemyUserRepository @@ -16,15 +16,15 @@ from {{ package_name }}.service_layer.unit_of_work import AbstractUnitOfWork, In class SqlAlchemyUnitOfWork(AbstractUnitOfWork): """Manage a SQLAlchemy session as one atomic unit.""" - def __init__(self, session_factory: sessionmaker[Session]): + def __init__(self, session_factory: async_sessionmaker[AsyncSession]): """Initialize the unit of work. Args: - session_factory: Factory used to create a SQLAlchemy session. + session_factory: Factory used to create an async SQLAlchemy session. """ self.session_factory = session_factory - def __enter__(self) -> SqlAlchemyUnitOfWork: + async def __aenter__(self) -> SqlAlchemyUnitOfWork: """Open a session and repositories.""" self.session = self.session_factory() {%- if include_user_example %} @@ -32,26 +32,27 @@ class SqlAlchemyUnitOfWork(AbstractUnitOfWork): {%- endif %} return self - def __exit__( + async def __aexit__( self, exception_type: type[BaseException] | None, exception: BaseException | None, traceback: TracebackType | None, ) -> None: """Roll back unfinished work and close the session.""" - super().__exit__(exception_type, exception, traceback) - self.session.close() + await super().__aexit__(exception_type, exception, traceback) + await self.session.close() - def commit(self) -> None: + async def commit(self) -> None: """Persist tracked aggregate changes and commit the transaction.""" try: {%- if include_user_example %} - self.users.persist_changes() + await self.users.persist_changes() +{%- else %} + await self.session.commit() {%- endif %} - self.session.commit() except IntegrityError as error: raise IntegrityConflict from error - def rollback(self) -> None: + async def rollback(self) -> None: """Roll back the SQLAlchemy transaction.""" - self.session.rollback() + await self.session.rollback() diff --git a/template/src/{{ package_name }}/asgi.py.jinja b/template/src/{{ package_name }}/asgi.py.jinja index 61937f4..4d804ab 100644 --- a/template/src/{{ package_name }}/asgi.py.jinja +++ b/template/src/{{ package_name }}/asgi.py.jinja @@ -21,7 +21,7 @@ async def on_startup(app: FastAPI): 1. https://fastapi.tiangolo.com/advanced/events/#startup-event """ log.debug("Execute FastAPI startup event handler.") - app.state.container.startup() + await app.state.container.startup() async def on_shutdown(app: FastAPI): @@ -32,7 +32,7 @@ async def on_shutdown(app: FastAPI): 1. https://fastapi.tiangolo.com/advanced/events/#shutdown-event """ log.debug("Execute FastAPI shutdown event handler.") - app.state.container.shutdown() + await app.state.container.shutdown() @asynccontextmanager diff --git a/template/src/{{ package_name }}/bootstrap.py.jinja b/template/src/{{ package_name }}/bootstrap.py.jinja index cf0eb74..aaed83a 100644 --- a/template/src/{{ package_name }}/bootstrap.py.jinja +++ b/template/src/{{ package_name }}/bootstrap.py.jinja @@ -2,13 +2,16 @@ from __future__ import annotations +{% if include_user_example -%} +from collections.abc import Awaitable, Callable +{%- else -%} from collections.abc import Callable +{%- endif %} from dataclasses import dataclass from functools import partial -from sqlalchemy import Engine, create_engine from sqlalchemy.engine import make_url -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.pool import StaticPool from {{ package_name }}.adapters.models.base import Base @@ -30,11 +33,11 @@ from {{ package_name }}.settings.database_settings import DatabaseSettings {%- if include_user_example %} -def _ignore_user_registered(event: UserRegistered) -> None: +async def _ignore_user_registered(event: UserRegistered) -> None: """Provide a default no-op external event publisher.""" -def _ignore_user_deactivated(event: UserDeactivated) -> None: +async def _ignore_user_deactivated(event: UserDeactivated) -> None: """Provide a default no-op external event publisher.""" {%- endif %} @@ -43,8 +46,8 @@ def _ignore_user_deactivated(event: UserDeactivated) -> None: class ApplicationContainer: """Hold process-level application dependencies.""" - engine: Engine - session_factory: sessionmaker[Session] + engine: AsyncEngine + session_factory: async_sessionmaker[AsyncSession] uow_factory: Callable[[], AbstractUnitOfWork] bus: MessageBus auto_create_schema: bool @@ -52,20 +55,21 @@ class ApplicationContainer: user_reader: UserReader {%- endif %} - def startup(self) -> None: + async def startup(self) -> None: """Initialize resources required by the running application.""" if self.auto_create_schema: - Base.metadata.create_all(self.engine) + async with self.engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) - def shutdown(self) -> None: + async def shutdown(self) -> None: """Release process-level resources.""" - self.engine.dispose() + await self.engine.dispose() def bootstrap( database_settings: DatabaseSettings | None = None, {%- if include_user_example %} - publish: Callable[[UserRegistered], None] = _ignore_user_registered, + publish: Callable[[UserRegistered], Awaitable[None]] = _ignore_user_registered, {%- endif %} ) -> ApplicationContainer: """Build application dependencies. @@ -87,8 +91,8 @@ def bootstrap( # single static-pooled connection so the schema and data created at # startup remain visible to every unit of work. engine_options = {"connect_args": {"check_same_thread": False}, "poolclass": StaticPool} - engine = create_engine(settings.URL, **engine_options) - session_factory = sessionmaker(bind=engine, expire_on_commit=False) + engine = create_async_engine(settings.URL, **engine_options) + session_factory = async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession) uow_factory = partial(SqlAlchemyUnitOfWork, session_factory) {%- if include_user_example %} user_reader = SqlAlchemyUserReader(session_factory) diff --git a/template/src/{{ package_name }}/entrypoint/monitor.py.jinja b/template/src/{{ package_name }}/entrypoint/monitor.py.jinja index cbceca3..0f703f6 100644 --- a/template/src/{{ package_name }}/entrypoint/monitor.py.jinja +++ b/template/src/{{ package_name }}/entrypoint/monitor.py.jinja @@ -41,7 +41,7 @@ async def query_liveness_probe() -> ResponseModel[LivenessProbed]: status_code=status.HTTP_200_OK, response_model=ResponseModel[ReadinessProbed], ) -def query_readiness_probe(container: Container) -> ResponseModel[ReadinessProbed] | JSONResponse: +async def query_readiness_probe(container: Container) -> ResponseModel[ReadinessProbed] | JSONResponse: """ Probe the system readiness. @@ -54,8 +54,8 @@ def query_readiness_probe(container: Container) -> ResponseModel[ReadinessProbed HTTP 503 so orchestrators stop routing traffic to this pod. """ try: - with container.engine.connect() as connection: - connection.execute(text("SELECT 1")) + async with container.engine.connect() as connection: + await connection.execute(text("SELECT 1")) except SQLAlchemyError: log.warning("Readiness probe failed: database is unreachable.", exc_info=True) body = ResponseModel(data=ReadinessProbed(status="Error")) diff --git a/template/src/{{ package_name }}/entrypoint/users.py.jinja b/template/src/{{ package_name }}/entrypoint/users.py.jinja index a41d53c..f399c5a 100644 --- a/template/src/{{ package_name }}/entrypoint/users.py.jinja +++ b/template/src/{{ package_name }}/entrypoint/users.py.jinja @@ -64,23 +64,23 @@ class UserResponse(CamelCaseModel): @router.post("", status_code=status.HTTP_201_CREATED) -def register_user(payload: RegisterUserRequest, container: Container) -> ResponseModel[UserResponse]: +async def register_user(payload: RegisterUserRequest, container: Container) -> ResponseModel[UserResponse]: """Register a user.""" try: - user_id = container.bus.handle(payload.to_command()) + user_id = await container.bus.handle(payload.to_command()) except EmailAlreadyRegistered as error: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered") from error - user = get_user(user_id, container.user_reader) + user = await get_user(user_id, container.user_reader) if user is None: # pragma: no cover - defensive adapter guard raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registered user not found") return ResponseModel(data=UserResponse.from_read_model(user)) @router.get("/{user_id}") -def query_user(user_id: UUID, container: Container) -> ResponseModel[UserResponse]: +async def query_user(user_id: UUID, container: Container) -> ResponseModel[UserResponse]: """Return a registered user.""" - user = get_user(user_id, container.user_reader) + user = await get_user(user_id, container.user_reader) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") return ResponseModel(data=UserResponse.from_read_model(user)) diff --git a/template/src/{{ package_name }}/service_layer/handlers.py.jinja b/template/src/{{ package_name }}/service_layer/handlers.py.jinja index 4c522d9..6bc5dbd 100644 --- a/template/src/{{ package_name }}/service_layer/handlers.py.jinja +++ b/template/src/{{ package_name }}/service_layer/handlers.py.jinja @@ -1,6 +1,6 @@ """Application command and event handlers.""" -from collections.abc import Callable +from collections.abc import Awaitable, Callable from uuid import UUID from {{ package_name }}.domain.commands.user import DeactivateUser, RegisterUser @@ -22,7 +22,7 @@ class UserNotFound(LookupError): super().__init__(f"User {user_id} not found") -def register_user(command: RegisterUser, uow: AbstractUnitOfWork) -> UUID: +async def register_user(command: RegisterUser, uow: AbstractUnitOfWork) -> UUID: """Register a user. Args: @@ -35,8 +35,8 @@ def register_user(command: RegisterUser, uow: AbstractUnitOfWork) -> UUID: Raises: EmailAlreadyRegistered: If the normalized email already exists. """ - with uow: - if uow.users.get_by_email(command.email) is not None: + async with uow: + if await uow.users.get_by_email(command.email) is not None: raise EmailAlreadyRegistered(command.email) user = User.register( name=command.name, @@ -46,13 +46,13 @@ def register_user(command: RegisterUser, uow: AbstractUnitOfWork) -> UUID: ) uow.users.add(user) try: - uow.commit() + await uow.commit() except IntegrityConflict as error: raise EmailAlreadyRegistered(command.email) from error return user.id -def deactivate_user(command: DeactivateUser, uow: AbstractUnitOfWork) -> UUID: +async def deactivate_user(command: DeactivateUser, uow: AbstractUnitOfWork) -> UUID: """Deactivate a user. Loads the aggregate through the unit of work, applies the deactivation @@ -70,20 +70,22 @@ def deactivate_user(command: DeactivateUser, uow: AbstractUnitOfWork) -> UUID: Raises: UserNotFound: If no user exists for the requested identity. """ - with uow: - user = uow.users.get(command.user_id) + async with uow: + user = await uow.users.get(command.user_id) if user is None: raise UserNotFound(command.user_id) user.deactivate() - uow.commit() + await uow.commit() return command.user_id -def publish_user_registered(event: UserRegistered, publish: Callable[[UserRegistered], None]) -> None: +async def publish_user_registered(event: UserRegistered, publish: Callable[[UserRegistered], Awaitable[None]]) -> None: """Publish user registration for interested external adapters.""" - publish(event) + await publish(event) -def publish_user_deactivated(event: UserDeactivated, publish: Callable[[UserDeactivated], None]) -> None: +async def publish_user_deactivated( + event: UserDeactivated, publish: Callable[[UserDeactivated], Awaitable[None]] +) -> None: """Publish user deactivation for interested external adapters.""" - publish(event) + await publish(event) diff --git a/template/src/{{ package_name }}/service_layer/messagebus.py.jinja b/template/src/{{ package_name }}/service_layer/messagebus.py.jinja index 107dc8c..6ced616 100644 --- a/template/src/{{ package_name }}/service_layer/messagebus.py.jinja +++ b/template/src/{{ package_name }}/service_layer/messagebus.py.jinja @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from collections.abc import Callable +from collections.abc import Awaitable, Callable from typing import Any from {{ package_name }}.domain.messages import Command, Event, Message @@ -12,8 +12,8 @@ from {{ package_name }}.service_layer.unit_of_work import AbstractUnitOfWork log = logging.getLogger(__name__) UnitOfWorkFactory = Callable[[], AbstractUnitOfWork] -CommandHandler = Callable[[Any, AbstractUnitOfWork], Any] -EventHandler = Callable[[Any], None] +CommandHandler = Callable[[Any, AbstractUnitOfWork], Awaitable[Any]] +EventHandler = Callable[[Any], Awaitable[None]] class UnsupportedMessageType(TypeError): @@ -46,35 +46,35 @@ class MessageBus: self.command_handlers = command_handlers self.event_handlers = event_handlers - def handle(self, message: Message) -> Any: + async def handle(self, message: Message) -> Any: """Dispatch one message and drain any resulting domain events.""" queue = [message] result = None while queue: current = queue.pop(0) if isinstance(current, Command): - result = self._handle_command(current, queue) + result = await self._handle_command(current, queue) elif isinstance(current, Event): - self._handle_event(current) + await self._handle_event(current) else: raise UnsupportedMessageType(current) return result - def _handle_command(self, command: Command, queue: list[Message]) -> Any: + async def _handle_command(self, command: Command, queue: list[Message]) -> Any: """Dispatch a command and collect resulting events.""" try: handler = self.command_handlers[type(command)] except KeyError as error: raise UnhandledCommand(command) from error uow = self.uow_factory() - result = handler(command, uow) + result = await handler(command, uow) queue.extend(uow.collect_new_events()) return result - def _handle_event(self, event: Event) -> None: + async def _handle_event(self, event: Event) -> None: """Dispatch an event to every interested handler.""" for handler in self.event_handlers.get(type(event), []): try: - handler(event) + await handler(event) except Exception: log.exception("Exception handling event %s", event) diff --git a/template/src/{{ package_name }}/service_layer/queries.py.jinja b/template/src/{{ package_name }}/service_layer/queries.py.jinja index 6dc93ba..c71baa6 100644 --- a/template/src/{{ package_name }}/service_layer/queries.py.jinja +++ b/template/src/{{ package_name }}/service_layer/queries.py.jinja @@ -9,10 +9,10 @@ from {{ package_name }}.service_layer.read_models import UserReadModel class UserReader(Protocol): """Describe user lookup behavior required by query consumers.""" - def get(self, user_id: UUID) -> UserReadModel | None: + async def get(self, user_id: UUID) -> UserReadModel | None: """Return a user read model by identity.""" -def get_user(user_id: UUID, reader: UserReader) -> UserReadModel | None: +async def get_user(user_id: UUID, reader: UserReader) -> UserReadModel | None: """Return a purpose-built user read model.""" - return reader.get(user_id) + return await reader.get(user_id) diff --git a/template/src/{{ package_name }}/service_layer/repository.py.jinja b/template/src/{{ package_name }}/service_layer/repository.py.jinja index 82d72b8..4a1a380 100644 --- a/template/src/{{ package_name }}/service_layer/repository.py.jinja +++ b/template/src/{{ package_name }}/service_layer/repository.py.jinja @@ -14,11 +14,11 @@ class UserRepository(Protocol): def add(self, user: User) -> None: """Persist a new user.""" - def get(self, user_id: UUID) -> User | None: + async def get(self, user_id: UUID) -> User | None: """Return a user by identity.""" - def get_by_email(self, email: str) -> User | None: + async def get_by_email(self, email: str) -> User | None: """Return a user by normalized email address.""" - def persist_changes(self) -> None: + async def persist_changes(self) -> None: """Write tracked aggregate state back to the underlying session.""" diff --git a/template/src/{{ package_name }}/service_layer/unit_of_work.py.jinja b/template/src/{{ package_name }}/service_layer/unit_of_work.py.jinja index e38c5df..6602fd0 100644 --- a/template/src/{{ package_name }}/service_layer/unit_of_work.py.jinja +++ b/template/src/{{ package_name }}/service_layer/unit_of_work.py.jinja @@ -23,25 +23,25 @@ class AbstractUnitOfWork(ABC): users: UserRepository {%- endif %} - def __enter__(self) -> AbstractUnitOfWork: + async def __aenter__(self) -> AbstractUnitOfWork: """Enter the transaction boundary.""" return self - def __exit__( + async def __aexit__( self, exception_type: type[BaseException] | None, exception: BaseException | None, traceback: TracebackType | None, ) -> None: """Roll back work that did not explicitly commit.""" - self.rollback() + await self.rollback() @abstractmethod - def commit(self) -> None: + async def commit(self) -> None: """Commit the current transaction.""" @abstractmethod - def rollback(self) -> None: + async def rollback(self) -> None: """Roll back the current transaction.""" def collect_new_events(self) -> Iterator[Event]: diff --git a/template/tests/conftest.py.jinja b/template/tests/conftest.py.jinja index 4b3bf91..e0a9193 100644 --- a/template/tests/conftest.py.jinja +++ b/template/tests/conftest.py.jinja @@ -2,29 +2,45 @@ Pytest Fixtures. """ -from collections.abc import Iterator +from collections.abc import AsyncIterator +import httpx import pytest -from starlette.testclient import TestClient from {{ package_name }}.asgi import get_application -from {{ package_name }}.bootstrap import bootstrap +from {{ package_name }}.bootstrap import ApplicationContainer, bootstrap from {{ package_name }}.settings.database_settings import DatabaseSettings -@pytest.fixture(name="test_client") -def fixture_test_client() -> Iterator[TestClient]: +@pytest.fixture(name="container") +async def fixture_container() -> AsyncIterator[ApplicationContainer]: + """Build an isolated in-memory async container with a created schema. + + The offline test tier always runs on in-memory async SQLite regardless of + the project's chosen runtime database, so collection never touches a real + ``DATABASE_URL`` or writes a file to the repository. + + Yields: + ApplicationContainer: A started container; its engine is disposed on + teardown. """ - Create a test client backed by an isolated in-memory application. + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite://", AUTO_CREATE_SCHEMA=True)) + await container.startup() + yield container + await container.shutdown() + + +@pytest.fixture(name="test_client") +async def fixture_test_client(container: ApplicationContainer) -> AsyncIterator[httpx.AsyncClient]: + """Create an async test client backed by the in-memory container. - Builds the production app from an injected container that uses an in-memory - SQLite database, so collection never touches a real ``DATABASE_URL`` or - writes a file to the repository. The client runs inside a ``with`` block so - the FastAPI lifespan executes and disposes resources on teardown. + ``ASGITransport`` does not run the FastAPI lifespan, so the ``container`` + fixture starts and disposes the application resources instead. Yields: - TestClient: A test client for the app. + httpx.AsyncClient: An async client for the app. """ - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=True)) - with TestClient(get_application(container)) as client: + app = get_application(container) + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: yield client diff --git a/template/tests/e2e/entrypoint/test_monitor.py.jinja b/template/tests/e2e/entrypoint/test_monitor.py.jinja index c867a2e..9215f03 100644 --- a/template/tests/e2e/entrypoint/test_monitor.py.jinja +++ b/template/tests/e2e/entrypoint/test_monitor.py.jinja @@ -2,19 +2,21 @@ Test Cases for Monitor Entrypoint. """ +from unittest.mock import patch + +import httpx import pytest from fastapi import status from sqlalchemy.exc import SQLAlchemyError -from starlette.testclient import TestClient +from sqlalchemy.ext.asyncio import AsyncEngine from {{ package_name }}.asgi import get_application -from {{ package_name }}.bootstrap import bootstrap +from {{ package_name }}.bootstrap import ApplicationContainer from {{ package_name }}.entrypoint.schemas import LivenessProbed, ReadinessProbed, ResponseModel -from {{ package_name }}.settings.database_settings import DatabaseSettings class TestMonitorEntryPoint: - def test_api_root_redirects_to_docs(self, test_client): + async def test_api_root_redirects_to_docs(self, test_client: httpx.AsyncClient): """ GIVEN a FastAPI application configured with the Monitor Entrypoint WHEN the root path is requested "GET /" @@ -22,13 +24,13 @@ class TestMonitorEntryPoint: """ # when - response = test_client.get("/").history[0] + response = await test_client.get("/") # then assert response.status_code == status.HTTP_301_MOVED_PERMANENTLY assert response.headers["location"] == "/docs" - def test_liveness_probe(self, test_client): + async def test_liveness_probe(self, test_client: httpx.AsyncClient): """ GIVEN a FastAPI application configured with the Monitor Entrypoint WHEN the liveness probe is requested "GET /liveness" @@ -36,7 +38,7 @@ class TestMonitorEntryPoint: """ # when - response = test_client.get("/liveness") + response = await test_client.get("/liveness") # then assert response.status_code == status.HTTP_200_OK @@ -45,7 +47,7 @@ class TestMonitorEntryPoint: except ValueError: pytest.fail("Response body is not a valid LivenessProbed JSON") - def test_readiness_probe_when_database_is_reachable(self, test_client): + async def test_readiness_probe_when_database_is_reachable(self, test_client: httpx.AsyncClient): """ GIVEN a FastAPI application whose database is reachable WHEN the readiness probe is requested "GET /readiness" @@ -53,7 +55,7 @@ class TestMonitorEntryPoint: """ # when - response = test_client.get("/readiness") + response = await test_client.get("/readiness") # then assert response.status_code == status.HTTP_200_OK @@ -64,26 +66,26 @@ class TestMonitorEntryPoint: assert isinstance(probe.data, ReadinessProbed) assert probe.data.status == "Ready" - def test_readiness_probe_when_database_is_unreachable(self): + async def test_readiness_probe_when_database_is_unreachable(self, container: ApplicationContainer): """ - GIVEN a FastAPI application whose database engine has been disposed + GIVEN a FastAPI application whose database engine is unreachable WHEN the readiness probe is requested "GET /readiness" THEN it should return 503, and a ReadinessProbed JSON with status "Error" """ # given - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=True)) app = get_application(container) - with TestClient(app) as client: - # Force every subsequent connection attempt to fail so the probe - # exercises its error branch even with an in-memory StaticPool. - def _raise_on_connect() -> None: - raise SQLAlchemyError - container.engine.connect = _raise_on_connect # type: ignore[method-assign] + # Force every connection attempt to fail so the probe exercises its + # error branch even with an in-memory StaticPool. + def _raise_on_connect(self: AsyncEngine) -> None: + raise SQLAlchemyError - # when - response = client.get("/readiness") + transport = httpx.ASGITransport(app=app) + with patch.object(AsyncEngine, "connect", _raise_on_connect): + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + # when + response = await client.get("/readiness") # then assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE diff --git a/template/tests/e2e/entrypoint/test_users.py.jinja b/template/tests/e2e/entrypoint/test_users.py.jinja index cea1c0f..6d7c9db 100644 --- a/template/tests/e2e/entrypoint/test_users.py.jinja +++ b/template/tests/e2e/entrypoint/test_users.py.jinja @@ -1,10 +1,10 @@ """End-to-end tests for user entrypoints.""" -from collections.abc import Iterator +from collections.abc import AsyncIterator +import httpx import pytest from fastapi import status -from starlette.testclient import TestClient from {{ package_name }}.asgi import get_application from {{ package_name }}.bootstrap import bootstrap @@ -13,18 +13,25 @@ from {{ package_name }}.settings.database_settings import DatabaseSettings @pytest.fixture(name="user_client") -def fixture_user_client() -> Iterator[tuple[TestClient, list[UserRegistered]]]: +async def fixture_user_client() -> AsyncIterator[tuple[httpx.AsyncClient, list[UserRegistered]]]: """Create an API client with isolated in-memory persistence.""" published: list[UserRegistered] = [] - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=True), publish=published.append) - with TestClient(get_application(container)) as client: + + async def publish(event: UserRegistered) -> None: + published.append(event) + + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite://", AUTO_CREATE_SCHEMA=True), publish=publish) + await container.startup() + transport = httpx.ASGITransport(app=get_application(container)) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: yield client, published + await container.shutdown() class TestUsersEntryPoint: """Test cases for user HTTP workflows.""" - def test_registers_and_queries_a_user(self, user_client: tuple[TestClient, list[UserRegistered]]): + async def test_registers_and_queries_a_user(self, user_client: tuple[httpx.AsyncClient, list[UserRegistered]]): """ GIVEN a running FastAPI application WHEN a user is registered and queried @@ -34,9 +41,9 @@ class TestUsersEntryPoint: client, published = user_client # WHEN - create_response = client.post("/api/v1/users", json={"name": "Ada Lovelace", "email": "ada@example.com"}) + create_response = await client.post("/api/v1/users", json={"name": "Ada Lovelace", "email": "ada@example.com"}) user_id = create_response.json()["data"]["id"] - query_response = client.get(f"/api/v1/users/{user_id}") + query_response = await client.get(f"/api/v1/users/{user_id}") # THEN assert create_response.status_code == status.HTTP_201_CREATED @@ -44,7 +51,7 @@ class TestUsersEntryPoint: assert query_response.json()["data"]["email"] == "ada@example.com" assert published == [UserRegistered(user_id=published[0].user_id, email="ada@example.com")] - def test_rejects_a_duplicate_email(self, user_client: tuple[TestClient, list[UserRegistered]]): + async def test_rejects_a_duplicate_email(self, user_client: tuple[httpx.AsyncClient, list[UserRegistered]]): """ GIVEN a registered user WHEN another registration uses the same email @@ -53,15 +60,17 @@ class TestUsersEntryPoint: # GIVEN client, _ = user_client payload = {"name": "Ada Lovelace", "email": "ada@example.com"} - client.post("/api/v1/users", json=payload) + await client.post("/api/v1/users", json=payload) # WHEN - response = client.post("/api/v1/users", json=payload) + response = await client.post("/api/v1/users", json=payload) # THEN assert response.status_code == status.HTTP_409_CONFLICT - def test_returns_not_found_for_an_unknown_user(self, user_client: tuple[TestClient, list[UserRegistered]]): + async def test_returns_not_found_for_an_unknown_user( + self, user_client: tuple[httpx.AsyncClient, list[UserRegistered]] + ): """ GIVEN a running FastAPI application WHEN an unknown user is queried @@ -71,7 +80,7 @@ class TestUsersEntryPoint: client, _ = user_client # WHEN - response = client.get("/api/v1/users/00000000-0000-0000-0000-000000000000") + response = await client.get("/api/v1/users/00000000-0000-0000-0000-000000000000") # THEN assert response.status_code == status.HTTP_404_NOT_FOUND diff --git a/template/tests/integration/conftest.py.jinja b/template/tests/integration/conftest.py.jinja new file mode 100644 index 0000000..d559920 --- /dev/null +++ b/template/tests/integration/conftest.py.jinja @@ -0,0 +1,41 @@ +"""Fixtures for the real-PostgreSQL integration tier. + +Every test here is marked ``integration`` and requires a running PostgreSQL +instance addressed by ``TEST_DATABASE_URL`` (an async ``asyncpg`` URL). When +that variable is unset the whole tier is skipped, so offline runs and the +template bake stay green without Docker. +""" + +import os +from collections.abc import AsyncIterator + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from {{ package_name }}.adapters.models.base import Base + + +@pytest.fixture(name="integration_database_url") +def fixture_integration_database_url() -> str: + """Return the configured integration database URL or skip the tier.""" + url = os.environ.get("TEST_DATABASE_URL") + if not url: + pytest.skip("TEST_DATABASE_URL is not set; skipping the PostgreSQL integration tier") + return url + + +@pytest.fixture(name="session_factory") +async def fixture_session_factory(integration_database_url: str) -> AsyncIterator[async_sessionmaker[AsyncSession]]: + """Create a session factory against the real PostgreSQL database. + + The schema is created and dropped around each test so runs are isolated. + """ + engine = create_async_engine(integration_database_url) + async with engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + try: + yield async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession) + finally: + async with engine.begin() as connection: + await connection.run_sync(Base.metadata.drop_all) + await engine.dispose() diff --git a/template/tests/integration/test_migrations.py.jinja b/template/tests/integration/test_migrations.py.jinja index 9facbe3..170fe87 100644 --- a/template/tests/integration/test_migrations.py.jinja +++ b/template/tests/integration/test_migrations.py.jinja @@ -1,37 +1,66 @@ -"""Integration tests for Alembic migrations.""" +"""Integration tests for Alembic migrations against PostgreSQL.""" -from pathlib import Path +import asyncio +import pytest from alembic import command from alembic.config import Config -from sqlalchemy import create_engine, inspect +from sqlalchemy import inspect +from sqlalchemy.ext.asyncio import create_async_engine + +pytestmark = pytest.mark.integration + + +async def _table_names(url: str) -> list[str]: + """Return the table names visible on the database at ``url``.""" + engine = create_async_engine(url) + async with engine.connect() as connection: + names = await connection.run_sync(lambda sync_conn: inspect(sync_conn).get_table_names()) + await engine.dispose() + return names + + +async def _drop_everything(url: str) -> None: + """Drop all tables so the migration test leaves a clean database.""" + from sqlalchemy import text + + engine = create_async_engine(url) + async with engine.begin() as connection: + await connection.execute(text("DROP SCHEMA public CASCADE")) + await connection.execute(text("CREATE SCHEMA public")) + await engine.dispose() class TestMigrations: """Test cases for relational schema migrations.""" - def test_upgrades_a_blank_database_to_head(self, tmp_path: Path): + def test_upgrades_a_blank_database_to_head(self, integration_database_url: str): """ - GIVEN a blank SQLite database + GIVEN a blank PostgreSQL database WHEN Alembic upgrades the database to head {%- if include_user_example %} THEN the user schema and Alembic revision table exist {%- else %} THEN Alembic records its revision table even with no migrations yet {%- endif %} + + This test is synchronous so that Alembic's async ``env.py`` can manage + its own event loop via ``asyncio.run`` without colliding with a running + loop (mirroring how ``alembic upgrade head`` runs on the command line). """ # GIVEN - database_path = tmp_path / "migration-test.db" - database_url = f"sqlite+pysqlite:///{database_path.as_posix()}" config = Config("alembic.ini") - config.set_main_option("sqlalchemy.url", database_url) + config.set_main_option("sqlalchemy.url", integration_database_url) # WHEN command.upgrade(config, "head") # THEN - tables = inspect(create_engine(database_url)).get_table_names() - assert "alembic_version" in tables + try: + tables = asyncio.run(_table_names(integration_database_url)) + assert "alembic_version" in tables {%- if include_user_example %} - assert "users" in tables + assert "users" in tables {%- endif %} + finally: + asyncio.run(_drop_everything(integration_database_url)) diff --git a/template/tests/integration/test_persistence.py.jinja b/template/tests/integration/test_persistence.py.jinja index 036c04b..b441aa1 100644 --- a/template/tests/integration/test_persistence.py.jinja +++ b/template/tests/integration/test_persistence.py.jinja @@ -1,36 +1,28 @@ -"""Integration tests for SQLAlchemy repositories and units of work.""" +"""Integration tests for SQLAlchemy user repositories against PostgreSQL. + +These verify repository, unit-of-work, and reader behavior on the real default +database. They are excluded from the coverage gate and run in a dedicated CI +stage; the equivalent offline coverage runs on in-memory async SQLite in the +unit tier. +""" -from collections.abc import Iterator -from datetime import UTC, datetime from uuid import uuid4 import pytest -from sqlalchemy import DateTime, create_engine -from sqlalchemy.orm import Session, sessionmaker -from sqlalchemy.pool import StaticPool +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from {{ package_name }}.adapters.models.base import Base -from {{ package_name }}.adapters.models.user import UserRecord from {{ package_name }}.adapters.queries import SqlAlchemyUserReader -from {{ package_name }}.adapters.repository import SqlAlchemyUserRepository from {{ package_name }}.adapters.unit_of_work import SqlAlchemyUnitOfWork from {{ package_name }}.domain.models.user import User from {{ package_name }}.service_layer.unit_of_work import IntegrityConflict - -@pytest.fixture(name="session_factory") -def fixture_session_factory() -> Iterator[sessionmaker[Session]]: - """Create an isolated in-memory SQLAlchemy session factory.""" - engine = create_engine("sqlite+pysqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool) - Base.metadata.create_all(engine) - yield sessionmaker(bind=engine, expire_on_commit=False) - engine.dispose() +pytestmark = pytest.mark.integration class TestSqlAlchemyUnitOfWork: """Test persistence through the SQLAlchemy unit of work.""" - def test_commits_a_user(self, session_factory: sessionmaker[Session]): + async def test_commits_a_user(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN a SQLAlchemy unit of work WHEN a user is added and explicitly committed @@ -40,135 +32,41 @@ class TestSqlAlchemyUnitOfWork: user = User.register(name="Ada Lovelace", email="ada@example.com") # WHEN - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.users.add(user) - uow.commit() + await uow.commit() # THEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - loaded_user = uow.users.get(user.id) + async with SqlAlchemyUnitOfWork(session_factory) as uow: + loaded_user = await uow.users.get(user.id) assert loaded_user == user - def test_rolls_back_uncommitted_work(self, session_factory: sessionmaker[Session]): - """ - GIVEN a SQLAlchemy unit of work - WHEN a user is added without an explicit commit - THEN the user is rolled back - """ - # GIVEN - user = User.register(name="Ada Lovelace", email="ada@example.com") - - # WHEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - uow.users.add(user) - - # THEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - assert uow.users.get(user.id) is None - - def test_queries_a_user_by_normalized_email(self, session_factory: sessionmaker[Session]): - """ - GIVEN a persisted user - WHEN users are queried by normalized and unknown email addresses - THEN the repository returns the matching aggregate or None - """ - # GIVEN - user = User.register(name="Ada Lovelace", email="ada@example.com") - with SqlAlchemyUnitOfWork(session_factory) as uow: - uow.users.add(user) - uow.commit() - - # WHEN / THEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - assert uow.users.get_by_email(" ADA@example.com ") == user - assert uow.users.get_by_email("unknown@example.com") is None - - def test_preserves_timezone_aware_record_timestamps(self): - """ - GIVEN a persistence record with a timezone-aware timestamp - WHEN the record is translated to the domain - THEN the original timestamp is preserved - """ - # GIVEN - created_at = datetime.now(UTC) - record = UserRecord( - id=str(uuid4()), - name="Ada Lovelace", - email="ada@example.com", - is_active=True, - settings={"theme": "light", "language": "en", "marketing_enabled": False, "backup_email": None}, - created_at=created_at, - ) - - # WHEN - user = SqlAlchemyUserRepository._to_domain(record) - - # THEN - assert user.created_at == created_at - - def test_declares_timezone_aware_timestamp_storage(self): - """ - GIVEN the SQLAlchemy user persistence record - WHEN its created-at column is inspected - THEN timezone-aware storage is configured - """ - # WHEN - column_type = UserRecord.__table__.columns["created_at"].type - - # THEN - assert isinstance(column_type, DateTime) - assert column_type.timezone is True - - def test_persists_mutations_to_loaded_aggregates(self, session_factory: sessionmaker[Session]): + async def test_persists_mutations_to_loaded_aggregates(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN a persisted user loaded through a fresh unit of work WHEN the loaded aggregate is mutated and committed THEN a later unit of work observes the mutated state - - This guards against the silent data loss that the translation pattern - causes: a loaded aggregate is detached from SQLAlchemy change tracking, - so its mutations must be written back explicitly on commit. """ # GIVEN user = User.register(name="Ada Lovelace", email="ada@example.com") - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.users.add(user) - uow.commit() + await uow.commit() # WHEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - loaded_user = uow.users.get(user.id) + async with SqlAlchemyUnitOfWork(session_factory) as uow: + loaded_user = await uow.users.get(user.id) assert loaded_user is not None loaded_user.deactivate() - uow.commit() + await uow.commit() # THEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - reloaded_user = uow.users.get(user.id) + async with SqlAlchemyUnitOfWork(session_factory) as uow: + reloaded_user = await uow.users.get(user.id) assert reloaded_user is not None assert reloaded_user.is_active is False - def test_returns_the_same_instance_for_a_re_loaded_aggregate(self, session_factory: sessionmaker[Session]): - """ - GIVEN a persisted user - WHEN the same identity is loaded twice within one unit of work - THEN the repository returns the identical tracked instance - """ - # GIVEN - user = User.register(name="Ada Lovelace", email="ada@example.com") - with SqlAlchemyUnitOfWork(session_factory) as uow: - uow.users.add(user) - uow.commit() - - # WHEN / THEN - with SqlAlchemyUnitOfWork(session_factory) as uow: - first = uow.users.get(user.id) - by_email = uow.users.get_by_email("ada@example.com") - second = uow.users.get(user.id) - assert first is second - assert by_email is first - - def test_translates_integrity_errors(self, session_factory: sessionmaker[Session]): + async def test_translates_integrity_errors(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN two users with the same globally unique email WHEN the second transaction commits @@ -177,20 +75,21 @@ class TestSqlAlchemyUnitOfWork: # GIVEN first_user = User.register(name="Ada Lovelace", email="ada@example.com") second_user = User.register(name="Other Ada", email="ada@example.com") - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.users.add(first_user) - uow.commit() + await uow.commit() # WHEN / THEN - with pytest.raises(IntegrityConflict), SqlAlchemyUnitOfWork(session_factory) as uow: - uow.users.add(second_user) - uow.commit() + with pytest.raises(IntegrityConflict): + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(second_user) + await uow.commit() class TestSqlAlchemyUserReader: """Test read-side projection queries.""" - def test_returns_a_purpose_built_read_model(self, session_factory: sessionmaker[Session]): + async def test_returns_a_purpose_built_read_model(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN a persisted user aggregate WHEN the read-side adapter queries the user @@ -198,21 +97,20 @@ class TestSqlAlchemyUserReader: """ # GIVEN user = User.register(name="Ada Lovelace", email="ada@example.com") - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.users.add(user) - uow.commit() + await uow.commit() reader = SqlAlchemyUserReader(session_factory) # WHEN - projection = reader.get(user.id) + projection = await reader.get(user.id) # THEN assert projection is not None assert projection.id == user.id assert projection.email == "ada@example.com" - assert not hasattr(projection, "events") - def test_returns_none_for_an_unknown_user(self, session_factory: sessionmaker[Session]): + async def test_returns_none_for_an_unknown_user(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN an empty database WHEN the read-side adapter queries an unknown user @@ -222,4 +120,4 @@ class TestSqlAlchemyUserReader: reader = SqlAlchemyUserReader(session_factory) # WHEN / THEN - assert reader.get(uuid4()) is None + assert await reader.get(uuid4()) is None diff --git a/template/tests/integration/test_unit_of_work.py.jinja b/template/tests/integration/test_unit_of_work.py.jinja index aa93ef0..00d99f3 100644 --- a/template/tests/integration/test_unit_of_work.py.jinja +++ b/template/tests/integration/test_unit_of_work.py.jinja @@ -1,118 +1,97 @@ -"""Integration tests for the SQLAlchemy unit of work. +"""Integration tests for the SQLAlchemy unit of work against PostgreSQL. -These tests exercise the transaction adapter directly, independent of any +These exercise the async transaction adapter directly, independent of any domain aggregate slice, so the monitor-only baseline keeps the unit of work -covered. A throwaway SQLAlchemy record defined against the shared declarative -base stands in for a persisted row without depending on example domain models. +verified against the real database. A throwaway SQLAlchemy record stands in for +a persisted row without depending on example domain models. """ -from collections.abc import Iterator +from collections.abc import AsyncIterator import pytest -from sqlalchemy import String, create_engine, select -from sqlalchemy.orm import Mapped, Session, mapped_column, sessionmaker -from sqlalchemy.pool import StaticPool +from sqlalchemy import String, select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column -from {{ package_name }}.adapters.models.base import Base from {{ package_name }}.adapters.unit_of_work import SqlAlchemyUnitOfWork -from {{ package_name }}.service_layer.unit_of_work import AbstractUnitOfWork, IntegrityConflict +from {{ package_name }}.service_layer.unit_of_work import IntegrityConflict +pytestmark = pytest.mark.integration -class _Widget(Base): + +class _IntegrationBase(DeclarativeBase): + """Isolated declarative base so the widget table does not touch app schema.""" + + +class _Widget(_IntegrationBase): """A throwaway record used to drive the unit of work in tests.""" - __tablename__ = "uow_test_widgets" + __tablename__ = "uow_integration_widgets" - id: Mapped[str] = mapped_column(String(), primary_key=True) + id: Mapped[str] = mapped_column(String(255), primary_key=True) @pytest.fixture(name="session_factory") -def fixture_session_factory() -> Iterator[sessionmaker[Session]]: - """Create an isolated in-memory SQLAlchemy session factory.""" - engine = create_engine("sqlite+pysqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool) - Base.metadata.create_all(engine) - yield sessionmaker(bind=engine, expire_on_commit=False) - engine.dispose() +async def fixture_widget_session_factory( + integration_database_url: str, +) -> AsyncIterator[async_sessionmaker[AsyncSession]]: + """Create a PostgreSQL session factory with an isolated widget table.""" + engine = create_async_engine(integration_database_url) + async with engine.begin() as connection: + await connection.run_sync(_IntegrationBase.metadata.create_all) + try: + yield async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession) + finally: + async with engine.begin() as connection: + await connection.run_sync(_IntegrationBase.metadata.drop_all) + await engine.dispose() class TestSqlAlchemyUnitOfWork: """Test transaction semantics without a domain aggregate slice.""" - def test_commits_a_row(self, session_factory: sessionmaker[Session]): + async def test_commits_a_row(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN a SQLAlchemy unit of work - WHEN a row is added and explicitly committed + WHEN a row is added and the session is committed THEN a later session observes the persisted row """ # WHEN - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.session.add(_Widget(id="alpha")) - uow.commit() + await uow.session.commit() # THEN - with session_factory() as session: - assert session.get(_Widget, "alpha") is not None + async with session_factory() as session: + assert await session.get(_Widget, "alpha") is not None - def test_rolls_back_uncommitted_work(self, session_factory: sessionmaker[Session]): + async def test_rolls_back_uncommitted_work(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN a SQLAlchemy unit of work WHEN a row is added without an explicit commit THEN the implicit rollback on exit discards the row """ # WHEN - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.session.add(_Widget(id="beta")) # THEN - with session_factory() as session: - assert session.scalar(select(_Widget).where(_Widget.id == "beta")) is None + async with session_factory() as session: + assert (await session.execute(select(_Widget).where(_Widget.id == "beta"))).scalar_one_or_none() is None - def test_translates_integrity_errors(self, session_factory: sessionmaker[Session]): + async def test_translates_integrity_errors(self, session_factory: async_sessionmaker[AsyncSession]): """ GIVEN a row that already exists - WHEN a conflicting primary key is committed + WHEN a conflicting primary key is committed through the unit of work THEN the adapter raises an application persistence conflict """ # GIVEN - with SqlAlchemyUnitOfWork(session_factory) as uow: + async with SqlAlchemyUnitOfWork(session_factory) as uow: uow.session.add(_Widget(id="gamma")) - uow.commit() + await uow.session.commit() # WHEN / THEN - with pytest.raises(IntegrityConflict), SqlAlchemyUnitOfWork(session_factory) as uow: - uow.session.add(_Widget(id="gamma")) - uow.commit() - - -class TestAbstractUnitOfWork: - """Test the default transaction-boundary behavior.""" - - def test_enter_returns_self_and_exit_rolls_back(self): - """ - GIVEN a unit of work using the default boundary behavior - WHEN it is used as a context manager without committing - THEN entering returns the instance and exiting rolls work back - """ - - # GIVEN - class _RecordingUnitOfWork(AbstractUnitOfWork): - """A unit of work that records its commit and rollback calls.""" - - def __init__(self) -> None: - self.rolled_back = False - - def commit(self) -> None: - """Do nothing.""" - - def rollback(self) -> None: - """Record a rollback.""" - self.rolled_back = True - - uow = _RecordingUnitOfWork() - - # WHEN - with uow as entered: - assert entered is uow - - # THEN - assert uow.rolled_back is True + with pytest.raises(IntegrityConflict): + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.session.add(_Widget(id="gamma")) + await uow.commit() diff --git a/template/tests/unit/adapters/__init__.py b/template/tests/unit/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/template/tests/unit/adapters/test_persistence.py.jinja b/template/tests/unit/adapters/test_persistence.py.jinja new file mode 100644 index 0000000..872b363 --- /dev/null +++ b/template/tests/unit/adapters/test_persistence.py.jinja @@ -0,0 +1,235 @@ +"""Offline unit tests for SQLAlchemy user repositories and readers. + +These run the async adapters against in-memory async SQLite so repository, +unit-of-work, and reader behavior count toward the offline coverage gate +without Docker. PostgreSQL-specific behavior is verified separately in the +integration tier. +""" + +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from uuid import uuid4 + +import pytest +from sqlalchemy import DateTime +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.pool import StaticPool + +from {{ package_name }}.adapters.models.base import Base +from {{ package_name }}.adapters.models.user import UserRecord +from {{ package_name }}.adapters.queries import SqlAlchemyUserReader +from {{ package_name }}.adapters.repository import SqlAlchemyUserRepository +from {{ package_name }}.adapters.unit_of_work import SqlAlchemyUnitOfWork +from {{ package_name }}.domain.models.user import User +from {{ package_name }}.service_layer.unit_of_work import IntegrityConflict + + +@pytest.fixture(name="session_factory") +async def fixture_session_factory() -> AsyncIterator[async_sessionmaker[AsyncSession]]: + """Create an isolated in-memory async SQLAlchemy session factory.""" + engine = create_async_engine("sqlite+aiosqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool) + async with engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + yield async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession) + await engine.dispose() + + +class TestSqlAlchemyUnitOfWork: + """Test persistence through the SQLAlchemy unit of work.""" + + async def test_commits_a_user(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a SQLAlchemy unit of work + WHEN a user is added and explicitly committed + THEN a later unit of work can load the user + """ + # GIVEN + user = User.register(name="Ada Lovelace", email="ada@example.com") + + # WHEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(user) + await uow.commit() + + # THEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + loaded_user = await uow.users.get(user.id) + assert loaded_user == user + + async def test_rolls_back_uncommitted_work(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a SQLAlchemy unit of work + WHEN a user is added without an explicit commit + THEN the user is rolled back + """ + # GIVEN + user = User.register(name="Ada Lovelace", email="ada@example.com") + + # WHEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(user) + + # THEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + assert await uow.users.get(user.id) is None + + async def test_queries_a_user_by_normalized_email(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a persisted user + WHEN users are queried by normalized and unknown email addresses + THEN the repository returns the matching aggregate or None + """ + # GIVEN + user = User.register(name="Ada Lovelace", email="ada@example.com") + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(user) + await uow.commit() + + # WHEN / THEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + assert await uow.users.get_by_email(" ADA@example.com ") == user + assert await uow.users.get_by_email("unknown@example.com") is None + + def test_preserves_timezone_aware_record_timestamps(self): + """ + GIVEN a persistence record with a timezone-aware timestamp + WHEN the record is translated to the domain + THEN the original timestamp is preserved + """ + # GIVEN + created_at = datetime.now(UTC) + record = UserRecord( + id=str(uuid4()), + name="Ada Lovelace", + email="ada@example.com", + is_active=True, + settings={"theme": "light", "language": "en", "marketing_enabled": False, "backup_email": None}, + created_at=created_at, + ) + + # WHEN + user = SqlAlchemyUserRepository._to_domain(record) + + # THEN + assert user.created_at == created_at + + def test_declares_timezone_aware_timestamp_storage(self): + """ + GIVEN the SQLAlchemy user persistence record + WHEN its created-at column is inspected + THEN timezone-aware storage is configured + """ + # WHEN + column_type = UserRecord.__table__.columns["created_at"].type + + # THEN + assert isinstance(column_type, DateTime) + assert column_type.timezone is True + + async def test_persists_mutations_to_loaded_aggregates(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a persisted user loaded through a fresh unit of work + WHEN the loaded aggregate is mutated and committed + THEN a later unit of work observes the mutated state + + This guards against the silent data loss that the translation pattern + causes: a loaded aggregate is detached from SQLAlchemy change tracking, + so its mutations must be written back explicitly on commit. + """ + # GIVEN + user = User.register(name="Ada Lovelace", email="ada@example.com") + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(user) + await uow.commit() + + # WHEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + loaded_user = await uow.users.get(user.id) + assert loaded_user is not None + loaded_user.deactivate() + await uow.commit() + + # THEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + reloaded_user = await uow.users.get(user.id) + assert reloaded_user is not None + assert reloaded_user.is_active is False + + async def test_returns_the_same_instance_for_a_re_loaded_aggregate( + self, session_factory: async_sessionmaker[AsyncSession] + ): + """ + GIVEN a persisted user + WHEN the same identity is loaded twice within one unit of work + THEN the repository returns the identical tracked instance + """ + # GIVEN + user = User.register(name="Ada Lovelace", email="ada@example.com") + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(user) + await uow.commit() + + # WHEN / THEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + first = await uow.users.get(user.id) + by_email = await uow.users.get_by_email("ada@example.com") + second = await uow.users.get(user.id) + assert first is second + assert by_email is first + + async def test_translates_integrity_errors(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN two users with the same globally unique email + WHEN the second transaction commits + THEN the SQLAlchemy adapter raises an application persistence conflict + """ + # GIVEN + first_user = User.register(name="Ada Lovelace", email="ada@example.com") + second_user = User.register(name="Other Ada", email="ada@example.com") + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(first_user) + await uow.commit() + + # WHEN / THEN + with pytest.raises(IntegrityConflict): + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(second_user) + await uow.commit() + + +class TestSqlAlchemyUserReader: + """Test read-side projection queries.""" + + async def test_returns_a_purpose_built_read_model(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a persisted user aggregate + WHEN the read-side adapter queries the user + THEN it returns a projection without write-side aggregate events + """ + # GIVEN + user = User.register(name="Ada Lovelace", email="ada@example.com") + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.users.add(user) + await uow.commit() + reader = SqlAlchemyUserReader(session_factory) + + # WHEN + projection = await reader.get(user.id) + + # THEN + assert projection is not None + assert projection.id == user.id + assert projection.email == "ada@example.com" + assert not hasattr(projection, "events") + + async def test_returns_none_for_an_unknown_user(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN an empty database + WHEN the read-side adapter queries an unknown user + THEN it returns None + """ + # GIVEN + reader = SqlAlchemyUserReader(session_factory) + + # WHEN / THEN + assert await reader.get(uuid4()) is None diff --git a/template/tests/unit/adapters/test_unit_of_work.py.jinja b/template/tests/unit/adapters/test_unit_of_work.py.jinja new file mode 100644 index 0000000..041cebe --- /dev/null +++ b/template/tests/unit/adapters/test_unit_of_work.py.jinja @@ -0,0 +1,127 @@ +"""Offline unit tests for the SQLAlchemy unit of work. + +These exercise the async transaction adapter against in-memory async SQLite, +so the unit-of-work boundary stays covered without Docker. The fast offline +tier runs on ``sqlite+aiosqlite`` regardless of the project's runtime database. +""" +{%- if include_user_example %} + +from {{ package_name }}.service_layer.unit_of_work import AbstractUnitOfWork +{%- else %} + +from collections.abc import AsyncIterator + +import pytest +from sqlalchemy import String, select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.pool import StaticPool + +from {{ package_name }}.adapters.models.base import Base +from {{ package_name }}.adapters.unit_of_work import SqlAlchemyUnitOfWork +from {{ package_name }}.service_layer.unit_of_work import AbstractUnitOfWork, IntegrityConflict + + +class _Widget(Base): + """A throwaway record used to drive the unit of work in tests.""" + + __tablename__ = "uow_test_widgets" + + id: Mapped[str] = mapped_column(String(), primary_key=True) + + +@pytest.fixture(name="session_factory") +async def fixture_session_factory() -> AsyncIterator[async_sessionmaker[AsyncSession]]: + """Create an isolated in-memory async SQLAlchemy session factory.""" + engine = create_async_engine("sqlite+aiosqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool) + async with engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + yield async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession) + await engine.dispose() +{%- endif %} + + +class TestAbstractUnitOfWork: + """Test the default transaction-boundary behavior.""" + + async def test_enter_returns_self_and_exit_rolls_back(self): + """ + GIVEN a unit of work using the default boundary behavior + WHEN it is used as an async context manager without committing + THEN entering returns the instance and exiting rolls work back + """ + + # GIVEN + class _RecordingUnitOfWork(AbstractUnitOfWork): + """A unit of work that records its commit and rollback calls.""" + + def __init__(self) -> None: + self.rolled_back = False + + async def commit(self) -> None: + """Do nothing.""" + + async def rollback(self) -> None: + """Record a rollback.""" + self.rolled_back = True + + uow = _RecordingUnitOfWork() + + # WHEN + async with uow as entered: + assert entered is uow + + # THEN + assert uow.rolled_back is True +{%- if not include_user_example %} + + +class TestSqlAlchemyUnitOfWork: + """Test transaction semantics without a domain aggregate slice.""" + + async def test_commits_a_row(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a SQLAlchemy unit of work + WHEN a row is added and explicitly committed + THEN a later session observes the persisted row + """ + # WHEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.session.add(_Widget(id="alpha")) + await uow.commit() + + # THEN + async with session_factory() as session: + assert await session.get(_Widget, "alpha") is not None + + async def test_rolls_back_uncommitted_work(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a SQLAlchemy unit of work + WHEN a row is added without an explicit commit + THEN the implicit rollback on exit discards the row + """ + # WHEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.session.add(_Widget(id="beta")) + + # THEN + async with session_factory() as session: + assert (await session.execute(select(_Widget).where(_Widget.id == "beta"))).scalar_one_or_none() is None + + async def test_translates_integrity_errors(self, session_factory: async_sessionmaker[AsyncSession]): + """ + GIVEN a row that already exists + WHEN a conflicting primary key is committed + THEN the adapter raises an application persistence conflict + """ + # GIVEN + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.session.add(_Widget(id="gamma")) + await uow.commit() + + # WHEN / THEN + with pytest.raises(IntegrityConflict): + async with SqlAlchemyUnitOfWork(session_factory) as uow: + uow.session.add(_Widget(id="gamma")) + await uow.commit() +{%- endif %} diff --git a/template/tests/unit/service_layer/test_handlers.py.jinja b/template/tests/unit/service_layer/test_handlers.py.jinja index 5e9bb5b..5aba93f 100644 --- a/template/tests/unit/service_layer/test_handlers.py.jinja +++ b/template/tests/unit/service_layer/test_handlers.py.jinja @@ -31,21 +31,21 @@ class FakeUserRepository: self.users.append(user) self.seen[user.id] = user - def get(self, user_id: UUID) -> User | None: + async def get(self, user_id: UUID) -> User | None: """Return a user by identity.""" user = next((user for user in self.users if user.id == user_id), None) if user: self.seen[user.id] = user return user - def get_by_email(self, email: str) -> User | None: + async def get_by_email(self, email: str) -> User | None: """Return a user by normalized email.""" user = next((user for user in self.users if user.email == email.strip().lower()), None) if user: self.seen[user.id] = user return user - def persist_changes(self) -> None: + async def persist_changes(self) -> None: """Write tracked aggregates back (no-op for the in-memory fake).""" @@ -59,13 +59,13 @@ class FakeUnitOfWork(AbstractUnitOfWork): self.committed = False self.rolled_back = False - def commit(self) -> None: + async def commit(self) -> None: """Record a commit.""" if self.conflict_on_commit: raise IntegrityConflict self.committed = True - def rollback(self) -> None: + async def rollback(self) -> None: """Record a rollback.""" self.rolled_back = True @@ -73,7 +73,7 @@ class FakeUnitOfWork(AbstractUnitOfWork): class TestRegisterUser: """Test cases for registration orchestration.""" - def test_registers_user_and_commits(self): + async def test_registers_user_and_commits(self): """ GIVEN a registration command and an empty repository WHEN the registration handler executes @@ -84,14 +84,14 @@ class TestRegisterUser: uow = FakeUnitOfWork() # WHEN - user_id = register_user(command, uow) + user_id = await register_user(command, uow) # THEN assert user_id == command.user_id assert uow.committed - assert uow.users.get(command.user_id) is not None + assert await uow.users.get(command.user_id) is not None - def test_rejects_an_existing_email(self): + async def test_rejects_an_existing_email(self): """ GIVEN an existing user WHEN another registration uses the same normalized email @@ -103,9 +103,9 @@ class TestRegisterUser: # WHEN / THEN with pytest.raises(EmailAlreadyRegistered): - register_user(RegisterUser(name="Other Ada", email=" ADA@example.com "), uow) + await register_user(RegisterUser(name="Other Ada", email=" ADA@example.com "), uow) - def test_translates_a_concurrent_persistence_conflict(self): + async def test_translates_a_concurrent_persistence_conflict(self): """ GIVEN a registration race that violates a persistence constraint WHEN the unit of work commits the new user @@ -116,14 +116,14 @@ class TestRegisterUser: # WHEN / THEN with pytest.raises(EmailAlreadyRegistered): - register_user(RegisterUser(name="Ada Lovelace", email="ada@example.com"), uow) + await register_user(RegisterUser(name="Ada Lovelace", email="ada@example.com"), uow) assert uow.rolled_back class TestDeactivateUser: """Test cases for deactivation orchestration.""" - def test_deactivates_a_user_and_commits(self): + async def test_deactivates_a_user_and_commits(self): """ GIVEN an active user WHEN the deactivation handler executes @@ -135,7 +135,7 @@ class TestDeactivateUser: uow = FakeUnitOfWork([user]) # WHEN - user_id = deactivate_user(DeactivateUser(user_id=user.id), uow) + user_id = await deactivate_user(DeactivateUser(user_id=user.id), uow) # THEN assert user_id == user.id @@ -143,7 +143,7 @@ class TestDeactivateUser: assert user.events == [UserDeactivated(user_id=user.id)] assert uow.committed - def test_rejects_an_unknown_user(self): + async def test_rejects_an_unknown_user(self): """ GIVEN an empty repository WHEN the deactivation handler targets an unknown identity @@ -154,9 +154,9 @@ class TestDeactivateUser: # WHEN / THEN with pytest.raises(UserNotFound): - deactivate_user(DeactivateUser(user_id=uuid4()), uow) + await deactivate_user(DeactivateUser(user_id=uuid4()), uow) - def test_deactivation_is_idempotent(self): + async def test_deactivation_is_idempotent(self): """ GIVEN an already-inactive user WHEN the deactivation handler runs again @@ -169,7 +169,7 @@ class TestDeactivateUser: uow = FakeUnitOfWork([user]) # WHEN - deactivate_user(DeactivateUser(user_id=user.id), uow) + await deactivate_user(DeactivateUser(user_id=user.id), uow) # THEN assert user.is_active is False @@ -179,7 +179,7 @@ class TestDeactivateUser: class TestEventPublishers: """Test cases for the external event publisher handlers.""" - def test_publishes_user_registered(self): + async def test_publishes_user_registered(self): """ GIVEN a user-registered event and a capturing publisher WHEN the publish handler runs @@ -189,13 +189,16 @@ class TestEventPublishers: event = UserRegistered(user_id=uuid4(), email="ada@example.com") published: list[UserRegistered] = [] + async def publish(captured: UserRegistered) -> None: + published.append(captured) + # WHEN - publish_user_registered(event, published.append) + await publish_user_registered(event, publish) # THEN assert published == [event] - def test_publishes_user_deactivated(self): + async def test_publishes_user_deactivated(self): """ GIVEN a user-deactivated event and a capturing publisher WHEN the publish handler runs @@ -205,8 +208,11 @@ class TestEventPublishers: event = UserDeactivated(user_id=uuid4()) published: list[UserDeactivated] = [] + async def publish(captured: UserDeactivated) -> None: + published.append(captured) + # WHEN - publish_user_deactivated(event, published.append) + await publish_user_deactivated(event, publish) # THEN assert published == [event] diff --git a/template/tests/unit/service_layer/test_messagebus.py.jinja b/template/tests/unit/service_layer/test_messagebus.py.jinja index 53c46cb..529e3f4 100644 --- a/template/tests/unit/service_layer/test_messagebus.py.jinja +++ b/template/tests/unit/service_layer/test_messagebus.py.jinja @@ -31,11 +31,11 @@ class _StubUnitOfWork(AbstractUnitOfWork): self.committed = False self.rolled_back = False - def commit(self) -> None: + async def commit(self) -> None: """Record a commit.""" self.committed = True - def rollback(self) -> None: + async def rollback(self) -> None: """Record a rollback.""" self.rolled_back = True @@ -47,7 +47,7 @@ class _StubUnitOfWork(AbstractUnitOfWork): class TestMessageBus: """Test cases for internal command and event dispatch.""" - def test_dispatches_events_raised_by_command_handlers(self): + async def test_dispatches_events_raised_by_command_handlers(self): """ GIVEN a message bus whose command handler raises a domain event WHEN the command is dispatched @@ -57,23 +57,26 @@ class TestMessageBus: event = _SampleEvent(label="raised") published: list[_SampleEvent] = [] - def handle_command(command: _SampleCommand, uow: AbstractUnitOfWork) -> str: + async def handle_command(command: _SampleCommand, uow: AbstractUnitOfWork) -> str: return command.label + async def publish(captured: _SampleEvent) -> None: + published.append(captured) + bus = MessageBus( uow_factory=lambda: _StubUnitOfWork([event]), command_handlers={_SampleCommand: handle_command}, - event_handlers={_SampleEvent: [published.append]}, + event_handlers={_SampleEvent: [publish]}, ) # WHEN - result = bus.handle(_SampleCommand(label="ok")) + result = await bus.handle(_SampleCommand(label="ok")) # THEN assert result == "ok" assert published == [event] - def test_logs_event_handler_failures_and_continues(self): + async def test_logs_event_handler_failures_and_continues(self): """ GIVEN an event handler that raises an exception WHEN a domain event is dispatched @@ -81,10 +84,10 @@ class TestMessageBus: """ # GIVEN - def handle_command(command: _SampleCommand, uow: AbstractUnitOfWork) -> str: + async def handle_command(command: _SampleCommand, uow: AbstractUnitOfWork) -> str: return command.label - def fail_to_publish(event: _SampleEvent) -> None: + async def fail_to_publish(event: _SampleEvent) -> None: raise RuntimeError(event.label) bus = MessageBus( @@ -95,13 +98,13 @@ class TestMessageBus: # WHEN with patch("{{ package_name }}.service_layer.messagebus.log.exception") as log_exception: - result = bus.handle(_SampleCommand(label="ok")) + result = await bus.handle(_SampleCommand(label="ok")) # THEN assert result == "ok" log_exception.assert_called_once() - def test_rejects_messages_without_command_or_event_semantics(self): + async def test_rejects_messages_without_command_or_event_semantics(self): """ GIVEN a base message without command or event semantics WHEN the message bus receives it @@ -112,9 +115,9 @@ class TestMessageBus: # WHEN / THEN with pytest.raises(TypeError, match="Unsupported message type"): - bus.handle(Message()) + await bus.handle(Message()) - def test_rejects_commands_without_a_registered_handler(self): + async def test_rejects_commands_without_a_registered_handler(self): """ GIVEN a message bus with no handler for a command type WHEN that command is dispatched @@ -125,7 +128,7 @@ class TestMessageBus: # WHEN / THEN with pytest.raises(UnhandledCommand, match="_SampleCommand"): - bus.handle(_SampleCommand(label="ok")) + await bus.handle(_SampleCommand(label="ok")) class TestUnitOfWorkEventCollection: @@ -142,10 +145,10 @@ class TestUnitOfWorkEventCollection: class _EmptyUnitOfWork(AbstractUnitOfWork): """A unit of work that tracks no aggregates.""" - def commit(self) -> None: + async def commit(self) -> None: """Do nothing.""" - def rollback(self) -> None: + async def rollback(self) -> None: """Do nothing.""" uow = _EmptyUnitOfWork() diff --git a/template/tests/unit/test_asgi.py.jinja b/template/tests/unit/test_asgi.py.jinja index 9399c40..6e5069c 100644 --- a/template/tests/unit/test_asgi.py.jinja +++ b/template/tests/unit/test_asgi.py.jinja @@ -22,7 +22,7 @@ class TestASGI: THEN the application is returned and resources are disposed on exit """ # GIVEN - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=True)) + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite://", AUTO_CREATE_SCHEMA=True)) app = get_application(container) # WHEN / THEN @@ -37,7 +37,7 @@ class TestASGI: THEN credentials are allowed because origins are an explicit allow-list """ # GIVEN - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=True)) + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite://", AUTO_CREATE_SCHEMA=True)) app = get_application(container) # WHEN @@ -57,7 +57,7 @@ class TestASGI: """ # GIVEN monkeypatch.setenv("FASTAPI_BACKEND_CORS_ORIGINS", '["*"]') - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=True)) + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite://", AUTO_CREATE_SCHEMA=True)) app = get_application(container) # WHEN diff --git a/template/tests/unit/test_bootstrap.py.jinja b/template/tests/unit/test_bootstrap.py.jinja index 6e4b5f6..74a0aca 100644 --- a/template/tests/unit/test_bootstrap.py.jinja +++ b/template/tests/unit/test_bootstrap.py.jinja @@ -7,31 +7,31 @@ from {{ package_name }}.settings.database_settings import DatabaseSettings class TestBootstrap: """Test cases for application dependency composition.""" - def test_skips_schema_creation_by_default(self): + async def test_skips_schema_creation_by_default(self): """ GIVEN a container configured without automatic schema creation WHEN the application starts and stops THEN lifecycle hooks complete without creating tables """ # GIVEN - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite://", AUTO_CREATE_SCHEMA=False)) + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite://", AUTO_CREATE_SCHEMA=False)) # WHEN - container.startup() - container.shutdown() + await container.startup() + await container.shutdown() # THEN assert container.auto_create_schema is False - def test_uses_a_regular_pool_for_a_file_backed_database(self): + async def test_uses_a_regular_pool_for_a_file_backed_database(self): """ GIVEN a file-backed (non in-memory) database URL WHEN the container is composed THEN the static-pool branch is skipped and a normal engine is built """ # GIVEN / WHEN - container = bootstrap(DatabaseSettings(URL="sqlite+pysqlite:///./build/example.db", AUTO_CREATE_SCHEMA=False)) + container = bootstrap(DatabaseSettings(URL="sqlite+aiosqlite:///./build/example.db", AUTO_CREATE_SCHEMA=False)) # THEN assert "memory" not in str(container.engine.url) - container.shutdown() + await container.shutdown() diff --git a/tests/test_bake.py b/tests/test_bake.py index 782d21f..196c935 100644 --- a/tests/test_bake.py +++ b/tests/test_bake.py @@ -2,11 +2,13 @@ The template repository cannot import its own package in place once the package path contains Jinja. Instead we *bake* the template: render it to a temporary -directory with a sample answer set and run the generated project's full quality -gate (Ruff, Pyrefly, pytest at 100% coverage). +directory with a sample answer set and run the generated project's full offline +quality gate (Ruff, Pyrefly, pytest at 100% coverage from the unit + e2e tiers). -The bake runs across the feature-flag matrix so both the default -``include_user_example`` slice and the monitor-only baseline are validated. +The bake runs across the ``database x include_user_example`` matrix so both +database choices and both feature-flag states are validated. The PostgreSQL +integration tier is never run during the bake; it requires Docker and is +covered by a dedicated CI stage (see ADR 0019). """ from __future__ import annotations @@ -18,6 +20,7 @@ import copier import pytest +import yaml TEMPLATE_ROOT = Path(__file__).resolve().parent.parent @@ -32,16 +35,40 @@ "github_owner": "demo-org", "license": "MIT", "python_version": "3.13", - "database_url": "sqlite+pysqlite:///./demo-service.db", } -# The feature-flag matrix: each entry is a (test id, include_user_example) pair. +# The bake matrix: every (database, include_user_example) combination. Each +# entry is a single fixture-param value (a tuple) with a readable id. +BAKE_MATRIX = [ + pytest.param(("postgres", True), id="postgres-user-on"), + pytest.param(("postgres", False), id="postgres-user-off"), + pytest.param(("sqlite", True), id="sqlite-user-on"), + pytest.param(("sqlite", False), id="sqlite-user-off"), +] + FEATURE_FLAG_MATRIX = [ pytest.param(True, id="user-example-on"), pytest.param(False, id="user-example-off"), ] +def _database_url(database: str) -> str: + """Return the default async database URL for a database choice.""" + if database == "postgres": + return "postgresql+asyncpg://demo-service:demo-service@localhost:5432/demo-service" + return "sqlite+aiosqlite:///./demo-service.db" + + +def _answers(database: str, include_user_example: bool) -> dict[str, object]: + """Build the full Copier answer set for one matrix cell.""" + return { + **BASE_ANSWERS, + "database": database, + "database_url": _database_url(database), + "include_user_example": include_user_example, + } + + def _run(args: list[str], cwd: Path) -> subprocess.CompletedProcess[str]: """Run a command inside the baked project and capture its output.""" return subprocess.run( @@ -81,22 +108,16 @@ def template_source(tmp_path_factory: pytest.TempPathFactory) -> Path: return _snapshot_working_tree(tmp_path_factory.mktemp("template-src")) -@pytest.fixture(params=FEATURE_FLAG_MATRIX) -def baked_project( - request: pytest.FixtureRequest, - template_source: Path, - tmp_path_factory: pytest.TempPathFactory, -) -> Path: - """GIVEN the template, render it for each feature-flag combination. +def _bake(template_source: Path, dst: Path, database: str, include_user_example: bool) -> Path: + """Render the template into ``dst`` for one matrix cell. - `unsafe=True` is required so Copier executes the `_tasks` entry (`uv lock`). + ``unsafe=True`` is required so Copier executes the ``_tasks`` entry + (``uv lock``). """ - include_user_example: bool = request.param - dst = tmp_path_factory.mktemp("baked") copier.run_copy( str(template_source), str(dst), - data={**BASE_ANSWERS, "include_user_example": include_user_example}, + data=_answers(database, include_user_example), defaults=True, unsafe=True, quiet=True, @@ -104,6 +125,18 @@ def baked_project( return dst +@pytest.fixture(params=BAKE_MATRIX) +def baked_project( + request: pytest.FixtureRequest, + template_source: Path, + tmp_path_factory: pytest.TempPathFactory, +) -> Path: + """GIVEN the template, render it for each matrix cell.""" + database, include_user_example = request.param + dst = tmp_path_factory.mktemp("baked") + return _bake(template_source, dst, database, include_user_example) + + def test_package_directory_is_rendered(baked_project: Path) -> None: """WHEN the template is baked THEN the package dir uses the answer, not Jinja.""" assert (baked_project / "src" / "demo_service").is_dir() @@ -130,14 +163,7 @@ def test_user_slice_presence_matches_flag( ) -> None: """WHEN baked THEN the user example slice is present only when the flag is on.""" dst = tmp_path_factory.mktemp("baked-flag") - copier.run_copy( - str(template_source), - str(dst), - data={**BASE_ANSWERS, "include_user_example": include_user_example}, - defaults=True, - unsafe=True, - quiet=True, - ) + _bake(template_source, dst, "postgres", include_user_example) user_model = dst / "src" / "demo_service" / "domain" / "models" / "user.py" user_router = dst / "src" / "demo_service" / "entrypoint" / "users.py" user_migration = dst / "migrations" / "versions" / "20260531_0001_create_users.py" @@ -146,8 +172,44 @@ def test_user_slice_presence_matches_flag( assert user_migration.exists() is include_user_example -def test_baked_project_passes_quality_gate(baked_project: Path) -> None: - """WHEN the baked project is synced THEN ruff, pyrefly and pytest all pass at 100%.""" +@pytest.mark.parametrize( + "database", + [pytest.param("postgres", id="postgres"), pytest.param("sqlite", id="sqlite")], +) +def test_postgres_service_present_iff_postgres( + template_source: Path, + tmp_path_factory: pytest.TempPathFactory, + database: str, +) -> None: + """WHEN baked THEN docker-compose has a `db` service only for PostgreSQL.""" + dst = tmp_path_factory.mktemp("baked-compose") + _bake(template_source, dst, database, True) + compose = yaml.safe_load((dst / "docker-compose.yaml").read_text(encoding="utf-8")) + has_db_service = "db" in compose["services"] + assert has_db_service is (database == "postgres") + if database == "postgres": + assert compose["services"]["db"]["image"] == "pgvector/pgvector:pg17" + + +@pytest.mark.parametrize( + "database", + [pytest.param("postgres", id="postgres"), pytest.param("sqlite", id="sqlite")], +) +def test_integration_ci_job_present_iff_postgres( + template_source: Path, + tmp_path_factory: pytest.TempPathFactory, + database: str, +) -> None: + """WHEN baked THEN the CI integration job exists only for PostgreSQL.""" + dst = tmp_path_factory.mktemp("baked-ci") + _bake(template_source, dst, database, True) + workflow = yaml.safe_load((dst / ".github" / "workflows" / "build.yml").read_text(encoding="utf-8")) + has_integration_job = "integration" in workflow["jobs"] + assert has_integration_job is (database == "postgres") + + +def test_baked_project_passes_offline_quality_gate(baked_project: Path) -> None: + """WHEN the baked project is synced THEN ruff, pyrefly and the offline gate pass at 100%.""" sync = _run(["uv", "sync"], baked_project) assert sync.returncode == 0, f"uv sync failed:\n{sync.stdout}\n{sync.stderr}" @@ -161,8 +223,19 @@ def test_baked_project_passes_quality_gate(baked_project: Path) -> None: pyrefly = _run(["uv", "run", "pyrefly", "check"], baked_project) assert pyrefly.returncode == 0, f"pyrefly failed:\n{pyrefly.stdout}\n{pyrefly.stderr}" + # The offline gate excludes the PostgreSQL integration tier (ADR 0019). tests = _run( - ["uv", "run", "pytest", "--cov", "src", "--cov-report=term-missing", "--cov-fail-under=100"], + [ + "uv", + "run", + "pytest", + "-m", + "not integration", + "--cov", + "src", + "--cov-report=term-missing", + "--cov-fail-under=100", + ], baked_project, ) assert tests.returncode == 0, f"pytest/coverage failed:\n{tests.stdout}\n{tests.stderr}" diff --git a/uv.lock b/uv.lock index 46ac69e..568ec07 100644 --- a/uv.lock +++ b/uv.lock @@ -63,6 +63,7 @@ dev = [ { name = "copier" }, { name = "pre-commit" }, { name = "pytest" }, + { name = "pyyaml" }, ] [package.metadata] @@ -72,6 +73,7 @@ dev = [ { name = "copier", specifier = ">=9.4.0" }, { name = "pre-commit", specifier = ">=4.2.0" }, { name = "pytest", specifier = ">=8.4.0" }, + { name = "pyyaml", specifier = ">=6.0" }, ] [[package]]