diff --git a/.github/workflows/docker-demo.yaml b/.github/workflows/docker-demo.yaml new file mode 100644 index 0000000..0caab1a --- /dev/null +++ b/.github/workflows/docker-demo.yaml @@ -0,0 +1,86 @@ +name: Docker demo test (Docker Compose) + +on: + push: + branches: [main] + pull_request: + branches: [main] + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + demo: + runs-on: ubuntu-latest + timeout-minutes: 12 + + steps: + - uses: actions/checkout@v5 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull base images (cache warmup) + run: | + docker pull postgres:16 + docker pull python:3.12-slim + docker pull ghcr.io/astral-sh/uv:latest || true + + - name: Build app image with cache + uses: docker/build-push-action@v6 + with: + context: . + tags: ggmpilot:demo-ci + load: true + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Show Docker info + run: | + docker version + docker info + docker compose version || true + + - name: Run docker demo (Compose; keep DB running) + shell: bash + run: | + set -euo pipefail + export DEMO_APP_IMAGE=ggmpilot:demo-ci + docker compose -f docker/demo/docker-compose.yml up -d demo-db + docker compose -f docker/demo/docker-compose.yml run --rm demo-run + + - name: Validate demo output (tables exist) + if: always() + shell: bash + run: | + set -euo pipefail + # Verify a few expected tables in silver schema. + docker ps -a + docker exec ggmpilot-demo-db psql -U postgres -d demo -v ON_ERROR_STOP=1 -c "\ + SELECT table_schema, table_name\ + FROM information_schema.tables\ + WHERE table_schema IN ('source','staging','silver')\ + ORDER BY 1,2;\ + " + # Table casing can differ depending on init SQL and naming config. + # Prefer unquoted (lowercase) first, then quoted (uppercase). + docker exec ggmpilot-demo-db bash -lc "\ + set -euo pipefail;\ + psql -U postgres -d demo -v ON_ERROR_STOP=1 -c 'SELECT count(*) AS client_rows FROM silver.client;' \ + || psql -U postgres -d demo -v ON_ERROR_STOP=1 -c 'SELECT count(*) AS client_rows FROM silver."CLIENT";';\ + " + + - name: Dump logs (on failure) + if: failure() + shell: bash + run: | + set +e + docker compose -f docker/demo/docker-compose.yml ps + docker compose -f docker/demo/docker-compose.yml logs --no-color | tail -n 500 + + - name: Cleanup + if: always() + run: | + docker compose -f docker/demo/docker-compose.yml down -v --remove-orphans diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..57ccf78 --- /dev/null +++ b/conftest.py @@ -0,0 +1,61 @@ +"""Repo-wide pytest configuration. + +This file applies to all tests in the repository (including package-level +`*/tests` folders). It is intentionally minimal and focuses on integration-test +hygiene. +""" + +from __future__ import annotations + +import os + +import pytest + + +# Make shared fixtures/helpers from tests.integration_utils available everywhere. +pytest_plugins = ["tests.integration_utils"] + + +def _truthy_env(name: str) -> bool: + return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"} + + +@pytest.fixture(scope="session", autouse=True) +def _cleanup_test_db_containers_session(): + """Clean up ggmpilot test DB containers before and after the session. + + CI failures due to "port is already allocated" or inability to start new + containers are often caused by leftover `*-docker-db-*` containers/volumes + from previous runs. This cleanup is best-effort and only runs when: + + - Docker is reachable, and + - either `RUN_SLOW_TESTS` is enabled or we are in CI. + + To opt out locally, set `GGMPILOT_KEEP_TEST_CONTAINERS=1`. + """ + + from tests.integration_utils import ( + cleanup_all_test_db_containers, + docker_running, + slow_tests_enabled, + ) + + # In pytest-xdist, this session fixture would run in every worker process. + # Only run the global cleanup in the master/controller process to avoid + # workers deleting containers used by other workers. + if os.getenv("PYTEST_XDIST_WORKER"): + yield + return + + if _truthy_env("GGMPILOT_KEEP_TEST_CONTAINERS"): + yield + return + + should_cleanup = docker_running() and (slow_tests_enabled() or _truthy_env("CI")) + if should_cleanup: + cleanup_all_test_db_containers() + + yield + + if should_cleanup: + cleanup_all_test_db_containers() diff --git a/dev_sql_server/get_connection.py b/dev_sql_server/get_connection.py index 3f08d1e..72a1d3d 100644 --- a/dev_sql_server/get_connection.py +++ b/dev_sql_server/get_connection.py @@ -29,7 +29,13 @@ # Driver‑specific helpers # ──────────────────────────────────────────────────────────────────────────────── def _connect_postgres(cfg: Dict[str, Any]): - return psycopg2.connect(**cfg) + # psycopg2 can block for a while on connect() if no connect_timeout is set. + # In CI (and especially under parallel starts) we want fast retries. + cfg2 = dict(cfg) + cfg2.setdefault( + "connect_timeout", int(os.getenv("GGMPILOT_DB_CONNECT_TIMEOUT", "5")) + ) + return psycopg2.connect(**cfg2) def _connect_oracle(cfg: Dict[str, Any]): @@ -44,11 +50,17 @@ def _connect_oracle(cfg: Dict[str, Any]): def _connect_mssql(cfg: Dict[str, Any]): + # Keep individual connection attempts short so _wait_for_db_ready can retry. + # ODBC driver 18 defaults to Encrypt=yes; TrustServerCertificate is required + # for local/dev containers with self-signed certs. + connect_timeout = int(os.getenv("GGMPILOT_DB_CONNECT_TIMEOUT", "5")) conn_str = ( f"DRIVER={{{SQL_SERVER_DRIVER}}};" f"SERVER={cfg['host']},{cfg['port']};" f"DATABASE={cfg['dbname']};" f"UID={cfg['user']};PWD={cfg['password']};" + f"Connection Timeout={connect_timeout};" + "Encrypt=yes;" "TrustServerCertificate=yes;" ) return pyodbc.connect(conn_str) @@ -164,6 +176,77 @@ def _ensure_container_running( # (docker.from_env handles DOCKER_HOST/DOCKER_TLS_VERIFY vars automatically) client = docker.from_env() + def _resolve_bound_host_port( + container_obj, + requested_host_port: int, + *, + timeout_seconds: float = 15.0, + ) -> int: + """Resolve the host port bound to the DB container port. + + Docker can briefly report incomplete port mappings right after `run()`. + Under CI load, that can cause us to keep trying the *requested* port even + if Docker actually bound a different (random) port. Poll briefly until a + binding is visible, and fail fast if the container exits. + """ + + cport = cfg.get("container_port", cfg["default_port"]) + key = f"{cport}/tcp" + deadline = time.time() + timeout_seconds + last_status = None + + while time.time() < deadline: + try: + container_obj.reload() + except Exception: + time.sleep(0.5) + continue + + last_status = (getattr(container_obj, "status", None) or "").lower() + if last_status in {"exited", "dead"}: + try: + logs = container_obj.logs(tail=200) + if isinstance(logs, (bytes, bytearray)): + logs = logs.decode("utf-8", errors="replace") + except Exception: + logs = None + raise RuntimeError( + f"{db_type} container '{container_name}' exited during startup; " + f"requested host port {requested_host_port}. " + + ( + f"Last logs:\n{logs}" + if logs + else "(No container logs available)" + ) + ) + + # NetworkSettings is generally the most reliable source once the container is running. + ports_map = ( + container_obj.attrs.get("NetworkSettings", {}).get("Ports", {}) or {} + ) + bd = ports_map.get(key) + if bd and isinstance(bd, list) and bd: + port_str = bd[0].get("HostPort") + if port_str: + return int(port_str) + + # Fallback: HostConfig.PortBindings + port_bindings = ( + container_obj.attrs.get("HostConfig", {}).get("PortBindings", {}) or {} + ) + hb = port_bindings.get(key) + if hb and isinstance(hb, list) and hb: + port_str = hb[0].get("HostPort") + if port_str: + return int(port_str) + + time.sleep(0.5) + + raise TimeoutError( + f"Could not resolve bound host port for {db_type} container '{container_name}' " + f"within {timeout_seconds:.0f}s (requested {requested_host_port}; last status={last_status})." + ) + if force_refresh: # blow away everything and start from scratch try: @@ -178,6 +261,9 @@ def _ensure_container_running( except docker_errors.NotFound: pass + requested_port = port + used_random_host_port = False + try: container = client.containers.get(container_name) # Ensure port mapping matches requested host port; if not, recreate @@ -283,23 +369,23 @@ def _run_with_port_mapping(host_port_mapping): except Exception as e: if "port is already allocated" in str(e).lower(): # Use random host port + used_random_host_port = True container = _run_with_port_mapping(None) else: raise was_created = True - # Determine the effective host port by inspecting the container's port bindings - try: - container.reload() - cport = cfg.get("container_port", cfg["default_port"]) - key = f"{cport}/tcp" - bindings = container.attrs.get("NetworkSettings", {}).get("Ports", {}) or {} - bd = bindings.get(key) - if bd and isinstance(bd, list) and bd: - port_str = bd[0].get("HostPort") - if port_str: - port = int(port_str) - except Exception: - pass + + # Determine the effective host port by polling Docker until a binding is visible. + port = _resolve_bound_host_port(container, requested_port) + + if used_random_host_port: + logging.getLogger(__name__).warning( + "Port %s was already allocated for %s; using randomly assigned host port %s (container name stays %s).", + requested_port, + db_type, + port, + container_name, + ) return cfg, port, was_created @@ -500,11 +586,19 @@ def get_connection( container_name = container_name or f"{db_type}-docker-db-{port_effective}" volume_name = volume_name or f"{container_name}_data" + user_supplied_max_wait = max_wait_seconds is not None + if max_wait_seconds is None: + # Defaults are intentionally conservative for CI stability. + # Callers can always override via max_wait_seconds. if db_type == "oracle": max_wait_seconds = 600 elif db_type in ("mysql", "mariadb"): max_wait_seconds = 180 + elif db_type == "postgres": + max_wait_seconds = 180 + elif db_type == "mssql": + max_wait_seconds = 360 else: max_wait_seconds = 120 @@ -527,6 +621,24 @@ def get_connection( container_force_refresh, ) + # If the container was newly created, DB initialization can take + # substantially longer on cold CI runners (esp. MSSQL). Only adjust + # when the caller didn't explicitly choose a timeout. + if (not user_supplied_max_wait) and was_created: + if db_type == "postgres": + max_wait_seconds = max(max_wait_seconds, 240) + elif db_type == "mssql": + max_wait_seconds = max(max_wait_seconds, 480) + + # CI runners can be slower (image pulls, constrained IO). Allow opt-in + # override while keeping local runs reasonable. + if not user_supplied_max_wait: + if os.getenv("CI", "").strip().lower() in {"1", "true", "yes", "on"}: + if db_type == "postgres": + max_wait_seconds = max(max_wait_seconds, 240) + elif db_type == "mssql": + max_wait_seconds = max(max_wait_seconds, 480) + # Prepare configs for master and target DB # When running inside Docker, localhost refers to the container itself. # Prefer host.docker.internal; if not resolvable on Linux, fall back to Docker bridge gateway. @@ -537,7 +649,9 @@ def get_connection( except Exception: host_addr = os.getenv("HOST_GATEWAY_IP", "172.17.0.1") else: - host_addr = "localhost" + # Prefer IPv4 loopback to avoid occasional localhost/IPv6 resolution + # differences across CI runners. + host_addr = "127.0.0.1" # Choose appropriate admin DB/user per backend if db_type == "postgres": diff --git a/docker/demo/README.md b/docker/demo/README.md index 1db41f5..4854348 100644 --- a/docker/demo/README.md +++ b/docker/demo/README.md @@ -8,7 +8,7 @@ Hierdoor is geen lokale Python installatie nodig om de demo te draaien; enkel Do Vanuit de projectroot: ```bash -bash docker/demo/run_demo.sh +docker compose -f docker/demo/docker-compose.yml up --build ``` Dit doet het volgende: @@ -19,23 +19,20 @@ Dit doet het volgende: ### Opties -Je kunt de demo met verschillende opties draaien: +Je kunt de demo met verschillende opties draaien via Docker Compose: ```bash # Volledige demo met meegeleverde Postgres (standaard) -bash docker/demo/run_demo.sh - -# Forceer herbouwen van Docker-image -bash docker/demo/run_demo.sh --build +docker compose -f docker/demo/docker-compose.yml up --build # Start alleen de database (handig voor ontwikkeling) -bash docker/demo/run_demo.sh --db-only - -# Ruim containers en volumes op -bash docker/demo/run_demo.sh --clean +docker compose -f docker/demo/docker-compose.yml --profile db-only up -d # Gebruik externe database (zie hieronder) -bash docker/demo/run_demo.sh --external +docker compose -f docker/demo/docker-compose.yml --profile external up --build + +# Ruim containers en volumes op +docker compose -f docker/demo/docker-compose.yml down -v ``` Pas de demo aan via environment variables: @@ -55,12 +52,12 @@ Pas de demo aan via environment variables: Draaien met meer data: ```bash -DEMO_ROWS=100 bash docker/demo/run_demo.sh +DEMO_ROWS=100 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit ``` Gebruik een andere poort: ```bash -DEMO_DB_PORT=5555 bash docker/demo/run_demo.sh +DEMO_DB_PORT=5555 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit ``` ### Je eigen database gebruiken @@ -74,7 +71,7 @@ DEMO_DB_PORT=5432 \ DEMO_DB_USER=myuser \ DEMO_DB_PASSWORD="mypassword" \ DEMO_DB_NAME=mydb \ -bash docker/demo/run_demo.sh --external +docker compose -f docker/demo/docker-compose.yml --profile external up --build --abort-on-container-exit # MSSQL Server DEMO_DB_DRIVER="mssql+pyodbc" \ @@ -83,7 +80,7 @@ DEMO_DB_PORT=1433 \ DEMO_DB_USER=sa \ DEMO_DB_PASSWORD="YourPassword123!" \ DEMO_DB_NAME=mydb \ -bash docker/demo/run_demo.sh --external +docker compose -f docker/demo/docker-compose.yml --profile external up --build --abort-on-container-exit ``` > Let op: `host.docker.internal` verwijst naar de host machine vanuit Docker. @@ -94,19 +91,19 @@ Je kunt individuele pipeline-stappen overslaan: ```bash # Synthetische data generatie overslaan (gebruik bestaande CSVs) -SKIP_GENERATE=1 bash docker/demo/run_demo.sh +SKIP_GENERATE=1 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit # CSVs laden overslaan (als source schema al gevuld is) -SKIP_LOAD=1 bash docker/demo/run_demo.sh +SKIP_LOAD=1 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit # sql_to_staging stap overslaan -SKIP_SQL_TO_STAGING=1 bash docker/demo/run_demo.sh +SKIP_SQL_TO_STAGING=1 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit # staging_to_silver stap overslaan -SKIP_STAGING_TO_SILVER=1 bash docker/demo/run_demo.sh +SKIP_STAGING_TO_SILVER=1 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit # Alleen staging_to_silver draaien (veronderstelt dat staging al gevuld is) -SKIP_GENERATE=1 SKIP_LOAD=1 SKIP_SQL_TO_STAGING=1 bash docker/demo/run_demo.sh +SKIP_GENERATE=1 SKIP_LOAD=1 SKIP_SQL_TO_STAGING=1 docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit ``` ### Database inspecteren @@ -142,6 +139,6 @@ SELECT * FROM silver."CLIENT" LIMIT 10; # Stop containers docker compose -f docker/demo/docker-compose.yml down -# Stop en verwijder alle data -bash docker/demo/run_demo.sh --clean +# Stop en verwijder alle data (incl. volumes) +docker compose -f docker/demo/docker-compose.yml down -v ``` diff --git a/docker/demo/demo_entrypoint.sh b/docker/demo/demo_entrypoint.sh index a469b1c..f4197a0 100644 --- a/docker/demo/demo_entrypoint.sh +++ b/docker/demo/demo_entrypoint.sh @@ -1,5 +1,10 @@ -#!/bin/bash -set -euo pipefail +#!/usr/bin/env bash + +set -euo errexit + +# Some minimal environments don't support `pipefail` (or may run under `sh`). +# Best effort: enable it when available. +set -o pipefail 2>/dev/null || true # ============================================================================ # ggmpilot Demo Entrypoint diff --git a/docker/demo/docker-compose.yml b/docker/demo/docker-compose.yml index 66075cb..c39e8dd 100644 --- a/docker/demo/docker-compose.yml +++ b/docker/demo/docker-compose.yml @@ -1,13 +1,15 @@ name: ggmpilot-demo # Docker Compose configuration for running the full ggmpilot demo in containers. -# This allows users without a local Python installation to run the complete pipeline. +# Goal: you can run the complete pipeline with Docker only (no host scripts). # # Usage: -# bash docker/demo/run_demo.sh # Run full demo with bundled Postgres -# bash docker/demo/run_demo.sh --db-only # Only start the database +# docker compose -f docker/demo/docker-compose.yml up --build --abort-on-container-exit +# docker compose -f docker/demo/docker-compose.yml down -v # -# Advanced: To connect to your own database, set environment variables or mount a custom config. +# Optional: +# docker compose -f docker/demo/docker-compose.yml --profile db-only up -d +# docker compose -f docker/demo/docker-compose.yml --profile external up --build --abort-on-container-exit services: # PostgreSQL database for the demo (source + destination) @@ -30,18 +32,39 @@ services: # Persist data across runs (optional) - demo-db-data:/var/lib/postgresql/data - # Main demo runner: generates synthetic data, runs both pipelines - demo-app: + # Convenience profile: start only the database (handy for development) + demo-db-only: + image: postgres:16 + container_name: ggmpilot-demo-db-only + profiles: + - db-only + environment: + POSTGRES_USER: ${DEMO_DB_USER:-postgres} + POSTGRES_PASSWORD: ${DEMO_DB_PASSWORD:-postgres} + POSTGRES_DB: ${DEMO_DB_NAME:-demo} + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${DEMO_DB_USER:-postgres} -d ${DEMO_DB_NAME:-demo}"] + interval: 2s + timeout: 5s + retries: 30 + ports: + - "${DEMO_DB_PORT:-55432}:5432" + volumes: + - demo-db-data:/var/lib/postgresql/data + + # Main demo runner: generates synthetic data, runs both pipelines. + # This is the default service to run for a fully containerized demo. + demo-run: + image: ${DEMO_APP_IMAGE:-ggmpilot:demo} build: context: ../.. dockerfile: Dockerfile - image: ggmpilot:demo - container_name: ggmpilot-demo-app + container_name: ggmpilot-demo-run depends_on: demo-db: condition: service_healthy working_dir: /app - entrypoint: ["/bin/bash", "/app/docker/demo/demo_entrypoint.sh"] + entrypoint: ["/bin/bash", "-lc", "sed -i 's/\r$//' /app/docker/demo/demo_entrypoint.sh && chmod +x /app/docker/demo/demo_entrypoint.sh && /app/docker/demo/demo_entrypoint.sh"] environment: IN_DOCKER: "1" PYTHONPATH: "/app" @@ -68,17 +91,17 @@ services: # Optionally mount custom config # - ./my-config.ini:/app/docker/demo/config/custom.ini:ro - # Alternative: Run demo against external database (no demo-db dependency) - demo-app-external: + # Alternative: run demo against external database (no bundled Postgres). + demo-run-external: + image: ${DEMO_APP_IMAGE:-ggmpilot:demo} build: context: ../.. dockerfile: Dockerfile - image: ggmpilot:demo - container_name: ggmpilot-demo-app-external + container_name: ggmpilot-demo-run-external profiles: - external working_dir: /app - entrypoint: ["/bin/bash", "/app/docker/demo/demo_entrypoint.sh"] + entrypoint: ["/bin/bash", "-lc", "sed -i 's/\r$//' /app/docker/demo/demo_entrypoint.sh && chmod +x /app/docker/demo/demo_entrypoint.sh && /app/docker/demo/demo_entrypoint.sh"] environment: IN_DOCKER: "1" PYTHONPATH: "/app" diff --git a/odata_to_staging/tests/test_integration_odata_to_staging_postgres.py b/odata_to_staging/tests/test_integration_odata_to_staging_postgres.py index cdf641c..929435f 100644 --- a/odata_to_staging/tests/test_integration_odata_to_staging_postgres.py +++ b/odata_to_staging/tests/test_integration_odata_to_staging_postgres.py @@ -3,32 +3,18 @@ import runpy import io import contextlib -import subprocess import pytest import requests from sqlalchemy import text from dev_sql_server.get_connection import get_connection +from tests.integration_utils import docker_running, slow_tests_enabled, ports_dest NORTHWIND_V2 = "https://services.odata.org/V2/Northwind/Northwind.svc/" -def _docker_running() -> bool: - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - def _northwind_available() -> bool: """Best-effort sanity check that the Northwind service is reachable and returns JSON for an entity set. @@ -57,11 +43,11 @@ def _northwind_available() -> bool: @pytest.mark.slow @pytest.mark.postgres @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( @@ -69,13 +55,15 @@ def _northwind_available() -> bool: reason="Northwind OData service not reachable; skipping to avoid flakey failures.", ) def test_main_odata_to_staging_postgres(tmp_path): + dst_port = ports_dest["postgres"] + # Start a fresh Postgres destination engine = get_connection( db_type="postgres", db_name="ggm_odata_to_staging", user="sa", password="S3cureP@ssw0rd!23243", - port=5434, + port=dst_port, force_refresh=True, print_tables=False, ) @@ -95,7 +83,7 @@ def test_main_odata_to_staging_postgres(tmp_path): DST_USERNAME=sa DST_PASSWORD=S3cureP@ssw0rd!23243 DST_HOST=localhost -DST_PORT=5434 +DST_PORT={dst_port} DST_DB=ggm_odata_to_staging DST_SCHEMA=staging diff --git a/sql_to_staging/tests/test_integration_direct_transfer_streaming.py b/sql_to_staging/tests/test_integration_direct_transfer_streaming.py index 85f2f7a..6180db3 100644 --- a/sql_to_staging/tests/test_integration_direct_transfer_streaming.py +++ b/sql_to_staging/tests/test_integration_direct_transfer_streaming.py @@ -4,74 +4,37 @@ import logging import os -import shutil -import subprocess import re import pytest -import docker from sqlalchemy import text from dotenv import load_dotenv from dev_sql_server.get_connection import get_connection from sql_to_staging.functions.direct_transfer import direct_transfer +from tests.integration_utils import ( + cleanup_db_container_by_port, + docker_running, + ports, + ports_dest, + slow_tests_enabled, +) load_dotenv("tests/.env") -def _docker_running() -> bool: - if not shutil.which("docker"): - return False - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - # Use explicit ports consistent with other integration tests -SRC_PORT = 5433 -DST_PORT = 5434 - - -def _cleanup_db_containers(name: str, port: int): - """Stop and remove the container/volume our get_connection uses for a db/port pair.""" - client = docker.from_env() - cname = f"{name}-docker-db-{port}" - try: - c = client.containers.get(cname) - try: - c.stop() - except Exception: - pass - try: - c.remove() - except Exception: - pass - except Exception: - pass - # remove associated volume - vname = f"{cname}_data" - try: - v = client.volumes.get(vname) - v.remove(force=True) - except Exception: - pass +SRC_PORT = ports["postgres"] +DST_PORT = ports_dest["postgres"] @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.parametrize( @@ -93,8 +56,8 @@ def test_direct_transfer_streams_in_chunks_postgres( table = "stream_check" # Ensure a clean slate before starting - _cleanup_db_containers("postgres", SRC_PORT) - _cleanup_db_containers("postgres", DST_PORT) + cleanup_db_container_by_port("postgres", SRC_PORT) + cleanup_db_container_by_port("postgres", DST_PORT) try: # Start Postgres source and create table with many rows @@ -164,5 +127,5 @@ def test_direct_transfer_streams_in_chunks_postgres( inserts.append((int(m.group(1)), int(m.group(2)))) assert inserts == expected_batches finally: - _cleanup_db_containers("postgres", SRC_PORT) - _cleanup_db_containers("postgres", DST_PORT) + cleanup_db_container_by_port("postgres", SRC_PORT) + cleanup_db_container_by_port("postgres", DST_PORT) diff --git a/sql_to_staging/tests/test_postgres_nul_handling.py b/sql_to_staging/tests/test_postgres_nul_handling.py index bd6ad4e..3c72eb2 100644 --- a/sql_to_staging/tests/test_postgres_nul_handling.py +++ b/sql_to_staging/tests/test_postgres_nul_handling.py @@ -16,6 +16,36 @@ load_dotenv("tests/.env") +# Module-level fixture: spin up a single Postgres container for all tests in this file. +# This avoids repeated container create/destroy cycles which can cause race conditions +# and port-release timing issues in CI. +@pytest.fixture(scope="module") +def postgres_dest_engine(): + """Shared Postgres destination engine for all NUL-handling tests.""" + if not slow_tests_enabled() or not docker_running(): + pytest.skip("Slow tests or Docker not available") + + port = ports_dest["postgres"] + cleanup_db_container_by_port("postgres", port) + username = "sa" + password = "S3cureP@ssw0rd!23243" + engine = get_connection( + db_type="postgres", + db_name="nul_test_db", + user=username, + password=password, + port=port, + force_refresh=True, + print_tables=False, + ) + yield engine + # Teardown: remove the container after all tests in this module complete + try: + cleanup_db_container_by_port("postgres", port) + except Exception: + pass + + @pytest.mark.slow @pytest.mark.skipif( not slow_tests_enabled(), @@ -23,7 +53,7 @@ ) @pytest.mark.postgres @pytest.mark.skipif(not docker_running(), reason="Docker/DB required") -def test_upload_parquet_sanitizes_nul_postgres(tmp_path): +def test_upload_parquet_sanitizes_nul_postgres(tmp_path, postgres_dest_engine): """ Verifies that upload_parquet automatically strips NUL (0x00) characters from string columns when uploading to PostgreSQL, preventing ValueError. @@ -44,19 +74,8 @@ def test_upload_parquet_sanitizes_nul_postgres(tmp_path): file_path = parquet_dir / "nul_test.parquet" df.write_parquet(file_path) - # 2. Ensure clean containers and spin up destination engine like other tests - cleanup_db_container_by_port("postgres", ports_dest["postgres"]) - username = "sa" - password = "S3cureP@ssw0rd!23243" - engine = get_connection( - db_type="postgres", - db_name="nul_dst_pg", - user=username, - password=password, - port=ports_dest["postgres"], - force_refresh=True, - print_tables=False, - ) + # 2. Use the shared engine from the fixture + engine = postgres_dest_engine # 3. Upload schema = "test_nul_handling" @@ -86,19 +105,13 @@ def test_upload_parquet_sanitizes_nul_postgres(tmp_path): assert other_val == "alsobad", f"Expected 'alsobad', got {other_val!r}" finally: - # Cleanup schema and associated Docker containers + # Cleanup schema only (container cleanup handled by fixture) try: with engine.begin() as conn: conn.execute(text(f"DROP SCHEMA IF EXISTS {schema} CASCADE")) except Exception: pass - # Ensure Postgres test containers + volumes are removed like other tests - try: - cleanup_db_container_by_port("postgres", ports_dest["postgres"]) - except Exception: - pass - @pytest.mark.slow @pytest.mark.skipif( @@ -107,7 +120,7 @@ def test_upload_parquet_sanitizes_nul_postgres(tmp_path): ) @pytest.mark.postgres @pytest.mark.skipif(not docker_running(), reason="Docker/DB required") -def test_direct_transfer_sanitizes_nul_postgres(tmp_path): +def test_direct_transfer_sanitizes_nul_postgres(tmp_path, postgres_dest_engine): """ Verifies that direct_transfer automatically strips NUL (0x00) characters from string values when inserting into PostgreSQL. @@ -143,19 +156,8 @@ def test_direct_transfer_sanitizes_nul_postgres(tmp_path): {"t2": "contains\x00nul", "o2": "also\x00bad"}, ) - # 2) Start a clean Postgres destination - cleanup_db_container_by_port("postgres", ports_dest["postgres"]) - username = "sa" - password = "S3cureP@ssw0rd!23243" - dst_engine = get_connection( - db_type="postgres", - db_name="nul_dst_pg3", - user=username, - password=password, - port=ports_dest["postgres"], - force_refresh=True, - print_tables=False, - ) + # 2) Use the shared Postgres destination from fixture + dst_engine = postgres_dest_engine schema = "test_nul_handling_direct" @@ -187,10 +189,6 @@ def test_direct_transfer_sanitizes_nul_postgres(tmp_path): conn.execute(text(f"DROP SCHEMA IF EXISTS {schema} CASCADE")) except Exception: pass - try: - cleanup_db_container_by_port("postgres", ports_dest["postgres"]) - except Exception: - pass @pytest.mark.slow @@ -200,7 +198,9 @@ def test_direct_transfer_sanitizes_nul_postgres(tmp_path): ) @pytest.mark.postgres @pytest.mark.skipif(not docker_running(), reason="Docker/DB required") -def test_upload_parquet_without_sanitizing_nul_postgres_raises(tmp_path): +def test_upload_parquet_without_sanitizing_nul_postgres_raises( + tmp_path, postgres_dest_engine +): """Reproduce the original PostgreSQL NUL-byte error with sanitization disabled.""" df = pl.DataFrame( @@ -215,19 +215,8 @@ def test_upload_parquet_without_sanitizing_nul_postgres_raises(tmp_path): file_path = parquet_dir / "nul_test.parquet" df.write_parquet(file_path) - # Fresh destination for this repro as well - cleanup_db_container_by_port("postgres", ports_dest["postgres"]) - username = "sa" - password = "S3cureP@ssw0rd!23243" - engine = get_connection( - db_type="postgres", - db_name="nul_dst_pg2", - user=username, - password=password, - port=ports_dest["postgres"], - force_refresh=True, - print_tables=False, - ) + # Use the shared engine from fixture + engine = postgres_dest_engine schema = "test_nul_handling_no_sanitize" # Preflight: ensure the destination Postgres is accepting connections to avoid @@ -267,8 +256,3 @@ def test_upload_parquet_without_sanitizing_nul_postgres_raises(tmp_path): conn.execute(text(f"DROP SCHEMA IF EXISTS {schema} CASCADE")) except Exception: pass - - try: - cleanup_db_container_by_port("postgres", ports_dest["postgres"]) - except Exception: - pass diff --git a/sql_to_staging/tests/test_upload_parquet_mssql_datetime2.py b/sql_to_staging/tests/test_upload_parquet_mssql_datetime2.py index 976e90a..9840370 100644 --- a/sql_to_staging/tests/test_upload_parquet_mssql_datetime2.py +++ b/sql_to_staging/tests/test_upload_parquet_mssql_datetime2.py @@ -13,47 +13,26 @@ from dev_sql_server.get_connection import get_connection from utils.parquet.upload_parquet import upload_parquet - - -def _docker_running() -> bool: - import subprocess - - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - import os - - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa: F401 - - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False +from tests.integration_utils import ( + docker_running, + mssql_driver_available, + port_for_worker, + slow_tests_enabled, +) @pytest.mark.slow @pytest.mark.mssql @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) def test_upload_parquet_mssql_uses_datetime2(tmp_path: Path): @@ -63,7 +42,9 @@ def test_upload_parquet_mssql_uses_datetime2(tmp_path: Path): db_name="ggm_upload_parquet_dt2", user="sa", password="S3cureP@ssw0rd!23243", - port=1436, # use a distinct port to avoid clashes with other tests + port=port_for_worker( + 1436 + ), # use a distinct port to avoid clashes with other tests force_refresh=True, sql_folder=None, sql_suffix_filter=True, diff --git a/staging_to_silver/tests/test_integration_case_matching_mssql.py b/staging_to_silver/tests/test_integration_case_matching_mssql.py index 8750925..a0c7b5c 100644 --- a/staging_to_silver/tests/test_integration_case_matching_mssql.py +++ b/staging_to_silver/tests/test_integration_case_matching_mssql.py @@ -1,5 +1,4 @@ import os -import subprocess from pathlib import Path import pytest @@ -9,40 +8,26 @@ from staging_to_silver.functions.queries_setup import prepare_queries from staging_to_silver.functions.schema_qualifier import qualify_schema import configparser - - -def _docker_running() -> bool: - try: - res = subprocess.run(["docker", "info"], capture_output=True, text=True, timeout=5) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa: F401 - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False +from tests.integration_utils import ( + docker_running, + mssql_driver_available, + port_for_worker, + slow_tests_enabled, +) @pytest.mark.slow @pytest.mark.mssql @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) def test_mssql_case_matching_with_crossdb_source(tmp_path: Path): @@ -52,7 +37,7 @@ def test_mssql_case_matching_with_crossdb_source(tmp_path: Path): os.environ.pop("STAGING_COLUMN_NAME_CASE", None) # Start a SQL Server container and create a source database - port = 1436 + port = port_for_worker(1436) src_db = "src_case" engine = get_connection( db_type="mssql", @@ -69,9 +54,14 @@ def test_mssql_case_matching_with_crossdb_source(tmp_path: Path): # Create a minimal destination table (dbo.BESCHIKKING) in the same DB for easy validation with engine.begin() as conn: - conn.execute(text("IF OBJECT_ID(N'dbo.BESCHIKKING', N'U') IS NOT NULL DROP TABLE dbo.BESCHIKKING;")) - conn.execute(text( - """ + conn.execute( + text( + "IF OBJECT_ID(N'dbo.BESCHIKKING', N'U') IS NOT NULL DROP TABLE dbo.BESCHIKKING;" + ) + ) + conn.execute( + text( + """ CREATE TABLE dbo.BESCHIKKING ( BESCHIKKING_ID NVARCHAR(50), CLIENT_ID NVARCHAR(50), @@ -83,11 +73,20 @@ def test_mssql_case_matching_with_crossdb_source(tmp_path: Path): WET NVARCHAR(255) ) """ - )) + ) + ) # Prepare uppercase-quoted source table WVBESL with uppercase columns - conn.execute(text("IF OBJECT_ID(N'dbo.WVBESL', N'U') IS NOT NULL DROP TABLE dbo.WVBESL;")) - conn.execute(text("CREATE TABLE dbo.WVBESL (BESLUITNR NVARCHAR(50), CLIENTNR NVARCHAR(50));")) - conn.execute(text("INSERT INTO dbo.WVBESL (BESLUITNR, CLIENTNR) VALUES ('B200','C200');")) + conn.execute( + text("IF OBJECT_ID(N'dbo.WVBESL', N'U') IS NOT NULL DROP TABLE dbo.WVBESL;") + ) + conn.execute( + text( + "CREATE TABLE dbo.WVBESL (BESLUITNR NVARCHAR(50), CLIENTNR NVARCHAR(50));" + ) + ) + conn.execute( + text("INSERT INTO dbo.WVBESL (BESLUITNR, CLIENTNR) VALUES ('B200','C200');") + ) # Load queries cfg = configparser.ConfigParser() @@ -98,7 +97,9 @@ def test_mssql_case_matching_with_crossdb_source(tmp_path: Path): # Helper to insert def _insert_from_select(select_name, select_stmt): md = MetaData() - dest = Table(select_name, md, schema="dbo", autoload_with=engine, extend_existing=True) + dest = Table( + select_name, md, schema="dbo", autoload_with=engine, extend_existing=True + ) select_cols = [c.name for c in select_stmt.selected_columns] dest_map = {c.name.lower(): c for c in dest.columns} ordered = [] @@ -108,7 +109,9 @@ def _insert_from_select(select_name, select_stmt): except KeyError: ci = dest_map.get(c.lower()) if ci is None: - raise KeyError(f"Destination column '{c}' not found in {dest.fullname}") + raise KeyError( + f"Destination column '{c}' not found in {dest.fullname}" + ) ordered.append(ci) with engine.begin() as conn: conn.execute(dest.insert().from_select(ordered, select_stmt)) @@ -123,17 +126,23 @@ def _insert_from_select(select_name, select_stmt): _insert_from_select("BESCHIKKING", stmt_auto) with engine.connect() as conn: - cnt_auto = conn.execute(text("SELECT COUNT(*) FROM dbo.BESCHIKKING")).scalar_one() + cnt_auto = conn.execute( + text("SELECT COUNT(*) FROM dbo.BESCHIKKING") + ).scalar_one() assert cnt_auto >= 1 # Case B: strict mode with UPPER column preference os.environ["STAGING_NAME_MATCHING"] = "strict" os.environ["STAGING_COLUMN_NAME_CASE"] = "upper" with engine.begin() as conn: - conn.execute(text("INSERT INTO dbo.WVBESL (BESLUITNR, CLIENTNR) VALUES ('B201','C201');")) + conn.execute( + text("INSERT INTO dbo.WVBESL (BESLUITNR, CLIENTNR) VALUES ('B201','C201');") + ) stmt_strict = queries["BESCHIKKING"](engine, source_schema=source_schema_3part) _insert_from_select("BESCHIKKING", stmt_strict) with engine.connect() as conn: - cnt_strict = conn.execute(text("SELECT COUNT(*) FROM dbo.BESCHIKKING")).scalar_one() + cnt_strict = conn.execute( + text("SELECT COUNT(*) FROM dbo.BESCHIKKING") + ).scalar_one() assert cnt_strict >= cnt_auto + 1 diff --git a/staging_to_silver/tests/test_integration_case_matching_mssql_lower.py b/staging_to_silver/tests/test_integration_case_matching_mssql_lower.py index 58faced..60cb13d 100644 --- a/staging_to_silver/tests/test_integration_case_matching_mssql_lower.py +++ b/staging_to_silver/tests/test_integration_case_matching_mssql_lower.py @@ -1,5 +1,4 @@ import os -import subprocess import configparser import pytest @@ -7,43 +6,29 @@ from dev_sql_server.get_connection import get_connection from staging_to_silver.functions.queries_setup import prepare_queries - - -def _docker_running() -> bool: - try: - res = subprocess.run(["docker", "info"], capture_output=True, text=True, timeout=5) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa: F401 - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False +from tests.integration_utils import ( + docker_running, + mssql_driver_available, + slow_tests_enabled, +) +from tests.integration_utils import port_for_worker @pytest.mark.slow @pytest.mark.mssql @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) -def test_mssql_strict_lowercase_columns(tmp_path): +def test_mssql_case_matching_prefers_lowercase_source(tmp_path): # Reset staging matching env for k in [ "STAGING_NAME_MATCHING", @@ -53,7 +38,7 @@ def test_mssql_strict_lowercase_columns(tmp_path): os.environ.pop(k, None) # Start SQL Server and create target table and staging with lowercase columns - port = 1437 + port = port_for_worker(1437) engine = get_connection( db_type="mssql", db_name="src_case_lower", @@ -68,7 +53,11 @@ def test_mssql_strict_lowercase_columns(tmp_path): ) with engine.begin() as conn: - conn.execute(text("IF OBJECT_ID(N'dbo.BESCHIKKING', N'U') IS NOT NULL DROP TABLE dbo.BESCHIKKING;")) + conn.execute( + text( + "IF OBJECT_ID(N'dbo.BESCHIKKING', N'U') IS NOT NULL DROP TABLE dbo.BESCHIKKING;" + ) + ) conn.execute( text( """ @@ -85,9 +74,17 @@ def test_mssql_strict_lowercase_columns(tmp_path): """ ) ) - conn.execute(text("IF OBJECT_ID(N'dbo.wvbesl', N'U') IS NOT NULL DROP TABLE dbo.wvbesl;")) - conn.execute(text("CREATE TABLE dbo.wvbesl (besluitnr NVARCHAR(50), clientnr NVARCHAR(50));")) - conn.execute(text("INSERT INTO dbo.wvbesl (besluitnr, clientnr) VALUES ('B300','c300');")) + conn.execute( + text("IF OBJECT_ID(N'dbo.wvbesl', N'U') IS NOT NULL DROP TABLE dbo.wvbesl;") + ) + conn.execute( + text( + "CREATE TABLE dbo.wvbesl (besluitnr NVARCHAR(50), clientnr NVARCHAR(50));" + ) + ) + conn.execute( + text("INSERT INTO dbo.wvbesl (besluitnr, clientnr) VALUES ('B300','c300');") + ) # Load queries cfg = configparser.ConfigParser() @@ -95,10 +92,15 @@ def test_mssql_strict_lowercase_columns(tmp_path): cfg.set("settings", "SILVER_TABLE_NAME_CASE", "upper") queries = prepare_queries(cfg) - # Helper to insert def _insert_from_select(select_name, select_stmt): md = MetaData() - dest = Table(select_name, md, schema="dbo", autoload_with=engine, extend_existing=True) + dest = Table( + select_name, + md, + schema="dbo", + autoload_with=engine, + extend_existing=True, + ) select_cols = [c.name for c in select_stmt.selected_columns] dest_map = {c.name.lower(): c for c in dest.columns} ordered = [] @@ -108,7 +110,9 @@ def _insert_from_select(select_name, select_stmt): except KeyError: ci = dest_map.get(c.lower()) if ci is None: - raise KeyError(f"Destination column '{c}' not found in {dest.fullname}") + raise KeyError( + f"Destination column '{c}' not found in {dest.fullname}" + ) ordered.append(ci) with engine.begin() as conn: conn.execute(dest.insert().from_select(ordered, select_stmt)) @@ -120,5 +124,7 @@ def _insert_from_select(select_name, select_stmt): _insert_from_select("BESCHIKKING", stmt) with engine.connect() as conn: - cnt = conn.execute(text("SELECT COUNT(*) FROM dbo.BESCHIKKING WHERE CLIENT_ID='c300'")) .scalar_one() + cnt = conn.execute( + text("SELECT COUNT(*) FROM dbo.BESCHIKKING WHERE CLIENT_ID='c300'") + ).scalar_one() assert cnt == 1 diff --git a/staging_to_silver/tests/test_integration_case_matching_mssql_mixed.py b/staging_to_silver/tests/test_integration_case_matching_mssql_mixed.py index e97cd9b..9d88486 100644 --- a/staging_to_silver/tests/test_integration_case_matching_mssql_mixed.py +++ b/staging_to_silver/tests/test_integration_case_matching_mssql_mixed.py @@ -1,5 +1,4 @@ import os -import subprocess import configparser import pytest @@ -7,40 +6,26 @@ from dev_sql_server.get_connection import get_connection from staging_to_silver.functions.queries_setup import prepare_queries - - -def _docker_running() -> bool: - try: - res = subprocess.run(["docker", "info"], capture_output=True, text=True, timeout=5) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa: F401 - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False +from tests.integration_utils import ( + docker_running, + mssql_driver_available, + slow_tests_enabled, +) +from tests.integration_utils import port_for_worker @pytest.mark.slow @pytest.mark.mssql @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) def test_mssql_strict_lower_pref_against_uppercase_source(tmp_path): @@ -53,7 +38,7 @@ def test_mssql_strict_lower_pref_against_uppercase_source(tmp_path): os.environ.pop(k, None) # Start SQL Server and create target table and uppercase-named staging table - port = 1438 + port = port_for_worker(1438) engine = get_connection( db_type="mssql", db_name="src_case_mixed", @@ -69,7 +54,11 @@ def test_mssql_strict_lower_pref_against_uppercase_source(tmp_path): with engine.begin() as conn: # Destination table in dbo - conn.execute(text("IF OBJECT_ID(N'dbo.BESCHIKKING', N'U') IS NOT NULL DROP TABLE dbo.BESCHIKKING;")) + conn.execute( + text( + "IF OBJECT_ID(N'dbo.BESCHIKKING', N'U') IS NOT NULL DROP TABLE dbo.BESCHIKKING;" + ) + ) conn.execute( text( """ @@ -87,9 +76,17 @@ def test_mssql_strict_lower_pref_against_uppercase_source(tmp_path): ) ) # Upper-case source table name with UPPER columns - conn.execute(text("IF OBJECT_ID(N'dbo.WVBESL', N'U') IS NOT NULL DROP TABLE dbo.WVBESL;")) - conn.execute(text("CREATE TABLE dbo.WVBESL (BESLUITNR NVARCHAR(50), CLIENTNR NVARCHAR(50));")) - conn.execute(text("INSERT INTO dbo.WVBESL (BESLUITNR, CLIENTNR) VALUES ('B901','C901');")) + conn.execute( + text("IF OBJECT_ID(N'dbo.WVBESL', N'U') IS NOT NULL DROP TABLE dbo.WVBESL;") + ) + conn.execute( + text( + "CREATE TABLE dbo.WVBESL (BESLUITNR NVARCHAR(50), CLIENTNR NVARCHAR(50));" + ) + ) + conn.execute( + text("INSERT INTO dbo.WVBESL (BESLUITNR, CLIENTNR) VALUES ('B901','C901');") + ) # Load queries cfg = configparser.ConfigParser() @@ -100,7 +97,9 @@ def test_mssql_strict_lower_pref_against_uppercase_source(tmp_path): # Helper to insert from select while preserving select column order def _insert_from_select(select_name, select_stmt): md = MetaData() - dest = Table(select_name, md, schema="dbo", autoload_with=engine, extend_existing=True) + dest = Table( + select_name, md, schema="dbo", autoload_with=engine, extend_existing=True + ) select_cols = [c.name for c in select_stmt.selected_columns] dest_map = {c.name.lower(): c for c in dest.columns} ordered = [] @@ -110,7 +109,9 @@ def _insert_from_select(select_name, select_stmt): except KeyError: ci = dest_map.get(c.lower()) if ci is None: - raise KeyError(f"Destination column '{c}' not found in {dest.fullname}") + raise KeyError( + f"Destination column '{c}' not found in {dest.fullname}" + ) ordered.append(ci) with engine.begin() as conn: conn.execute(dest.insert().from_select(ordered, select_stmt)) @@ -123,5 +124,7 @@ def _insert_from_select(select_name, select_stmt): _insert_from_select("BESCHIKKING", stmt) with engine.connect() as conn: - cnt = conn.execute(text("SELECT COUNT(*) FROM dbo.BESCHIKKING WHERE CLIENT_ID='C901'")) .scalar_one() + cnt = conn.execute( + text("SELECT COUNT(*) FROM dbo.BESCHIKKING WHERE CLIENT_ID='C901'") + ).scalar_one() assert cnt == 1 diff --git a/staging_to_silver/tests/test_integration_declaratieregel_postgres.py b/staging_to_silver/tests/test_integration_declaratieregel_postgres.py index 7c546ae..14d3d9d 100644 --- a/staging_to_silver/tests/test_integration_declaratieregel_postgres.py +++ b/staging_to_silver/tests/test_integration_declaratieregel_postgres.py @@ -1,72 +1,25 @@ -import os -import subprocess import configparser import pytest -from sqlalchemy import MetaData, Table, text +from sqlalchemy import text from dev_sql_server.get_connection import get_connection from staging_to_silver.functions.queries_setup import prepare_queries - - -def _docker_running() -> bool: - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _insert_from_select(engine, target_schema: str, select_name: str, select_stmt): - md = MetaData() - try: - dest_table = Table( - select_name, - md, - schema=target_schema, - autoload_with=engine, - extend_existing=True, - ) - except Exception: - dest_table = Table( - select_name.lower(), - md, - schema=target_schema, - autoload_with=engine, - extend_existing=True, - ) - - select_col_order = [c.name for c in select_stmt.selected_columns] - dest_cols_map_ci = {c.name.lower(): c for c in dest_table.columns} - dest_cols = [] - for col_name in select_col_order: - try: - dest_cols.append(dest_table.columns[col_name]) - except KeyError: - ci = dest_cols_map_ci.get(col_name.lower()) - if ci is None: - raise KeyError( - f"Destination column '{col_name}' not found in table {dest_table.fullname}." - ) - dest_cols.append(ci) - - with engine.begin() as conn: - conn.execute(dest_table.insert().from_select(dest_cols, select_stmt)) +from tests.integration_utils import ( + docker_running, + insert_from_select_case_insensitive, + ports, + slow_tests_enabled, +) @pytest.mark.slow @pytest.mark.postgres @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker not available/running; required for this integration test.", ) def test_declaratieregel_postgres_insertion(tmp_path): @@ -76,7 +29,7 @@ def test_declaratieregel_postgres_insertion(tmp_path): db_name="ggm_declaratieregel", user="sa", password="S3cureP@ssw0rd!23243", - port=5433, + port=ports["postgres"], force_refresh=True, sql_folder="./ggm_selectie/cssd", sql_suffix_filter=True, @@ -117,8 +70,11 @@ def test_declaratieregel_postgres_insertion(tmp_path): queries = prepare_queries(cfg) stmt = queries["DECLARATIEREGEL"](engine, source_schema="staging") - _insert_from_select( - engine, target_schema="silver", select_name="DECLARATIEREGEL", select_stmt=stmt + insert_from_select_case_insensitive( + engine, + target_schema="silver", + dest_table_name="DECLARATIEREGEL", + select_stmt=stmt, ) with engine.connect() as conn: diff --git a/staging_to_silver/tests/test_integration_staging_table_case_preference_postgres.py b/staging_to_silver/tests/test_integration_staging_table_case_preference_postgres.py index ba08080..4949368 100644 --- a/staging_to_silver/tests/test_integration_staging_table_case_preference_postgres.py +++ b/staging_to_silver/tests/test_integration_staging_table_case_preference_postgres.py @@ -1,5 +1,4 @@ import os -import subprocess import configparser import pytest @@ -7,30 +6,17 @@ from dev_sql_server.get_connection import get_connection from staging_to_silver.functions.queries_setup import prepare_queries - - -def _docker_running() -> bool: - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} +from tests.integration_utils import docker_running, ports_dest, slow_tests_enabled @pytest.mark.slow @pytest.mark.postgres @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) def test_postgres_staging_table_name_case_preference(tmp_path): @@ -48,7 +34,7 @@ def test_postgres_staging_table_name_case_preference(tmp_path): db_name="ggm_case_pref", user="sa", password="S3cureP@ssw0rd!23243", - port=5434, + port=ports_dest["postgres"], force_refresh=True, sql_folder="./ggm_selectie/cssd", sql_suffix_filter=True, diff --git a/staging_to_silver/tests/test_integration_staging_to_silver.py b/staging_to_silver/tests/test_integration_staging_to_silver.py index 2781b50..42d63b3 100644 --- a/staging_to_silver/tests/test_integration_staging_to_silver.py +++ b/staging_to_silver/tests/test_integration_staging_to_silver.py @@ -3,90 +3,34 @@ # This ensures end-to-end functionality of staging_to_silver queries against real databases import os -import subprocess import pytest -from sqlalchemy import MetaData, Table, text +from sqlalchemy import text from dotenv import load_dotenv from dev_sql_server.get_connection import get_connection import configparser from staging_to_silver.functions.queries_setup import prepare_queries +from tests.integration_utils import ( + docker_running, + insert_from_select_case_insensitive, + mssql_driver_available, + ports, + slow_tests_enabled, +) load_dotenv("tests/.env") -def _docker_running() -> bool: - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa - - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False - - -def _insert_from_select(engine, target_schema: str, select_name: str, select_stmt): - # Reflect destination and perform column-order match in a case-insensitive way - md = MetaData() - try: - dest_table = Table( - select_name, - md, - schema=target_schema, - autoload_with=engine, - extend_existing=True, - ) - except Exception: - # Retry with lowercased name to handle Postgres' unquoted lowercasing - dest_table = Table( - select_name.lower(), - md, - schema=target_schema, - autoload_with=engine, - extend_existing=True, - ) - - select_col_order = [c.name for c in select_stmt.selected_columns] - dest_cols_map_ci = {c.name.lower(): c for c in dest_table.columns} - dest_cols = [] - for col_name in select_col_order: - try: - dest_cols.append(dest_table.columns[col_name]) - except KeyError: - ci = dest_cols_map_ci.get(col_name.lower()) - if ci is None: - raise KeyError( - f"Destination column '{col_name}' not found in table {dest_table.fullname}." - ) - dest_cols.append(ci) - - with engine.begin() as conn: - conn.execute(dest_table.insert().from_select(dest_cols, select_stmt)) - - @pytest.mark.slow @pytest.mark.postgres @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) def test_staging_to_silver_postgres(tmp_path): @@ -96,7 +40,7 @@ def test_staging_to_silver_postgres(tmp_path): db_name="ggm_staging_to_silver", user="sa", password="S3cureP@ssw0rd!23243", - port=5433, + port=ports["postgres"], force_refresh=True, sql_folder="./ggm_selectie/cssd", sql_suffix_filter=True, @@ -180,17 +124,20 @@ def test_staging_to_silver_postgres(tmp_path): # Run BESCHIKTE_VOORZIENING (lowercase source tables) → into silver stmt_bv = queries["BESCHIKTE_VOORZIENING"](engine, source_schema="staging") - _insert_from_select( + insert_from_select_case_insensitive( engine, target_schema="silver", - select_name="BESCHIKTE_VOORZIENING", + dest_table_name="BESCHIKTE_VOORZIENING", select_stmt=stmt_bv, ) # Run BESCHIKKING (uppercase source table name) → into silver stmt_b = queries["BESCHIKKING"](engine, source_schema="staging") - _insert_from_select( - engine, target_schema="silver", select_name="BESCHIKKING", select_stmt=stmt_b + insert_from_select_case_insensitive( + engine, + target_schema="silver", + dest_table_name="BESCHIKKING", + select_stmt=stmt_b, ) # Validate inserted rows @@ -208,15 +155,15 @@ def test_staging_to_silver_postgres(tmp_path): @pytest.mark.slow @pytest.mark.mssql @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) def test_staging_to_silver_mssql(tmp_path): @@ -226,7 +173,7 @@ def test_staging_to_silver_mssql(tmp_path): db_name="ggm_staging_to_silver", user="sa", password="S3cureP@ssw0rd!23243", - port=1434, + port=ports["mssql"], force_refresh=True, sql_folder="./ggm_selectie/cssd", sql_suffix_filter=True, @@ -300,14 +247,14 @@ def test_staging_to_silver_mssql(tmp_path): # BESCHIKKING stmt_b = queries["BESCHIKKING"](engine, source_schema="dbo") - _insert_from_select( - engine, target_schema="dbo", select_name="BESCHIKKING", select_stmt=stmt_b + insert_from_select_case_insensitive( + engine, target_schema="dbo", dest_table_name="BESCHIKKING", select_stmt=stmt_b ) # CLIENT stmt_client = queries["CLIENT"](engine, source_schema="dbo") - _insert_from_select( - engine, target_schema="dbo", select_name="CLIENT", select_stmt=stmt_client + insert_from_select_case_insensitive( + engine, target_schema="dbo", dest_table_name="CLIENT", select_stmt=stmt_client ) # Validate inserted rows diff --git a/staging_to_silver/tests/test_main_staging_to_silver_postgres.py b/staging_to_silver/tests/test_main_staging_to_silver_postgres.py index f899f76..28a4e21 100644 --- a/staging_to_silver/tests/test_main_staging_to_silver_postgres.py +++ b/staging_to_silver/tests/test_main_staging_to_silver_postgres.py @@ -1,5 +1,4 @@ import os -import subprocess import sys import runpy import io @@ -10,43 +9,32 @@ from dotenv import load_dotenv from dev_sql_server.get_connection import get_connection +from tests.integration_utils import docker_running, slow_tests_enabled, ports_dest load_dotenv("tests/.env") -def _docker_running() -> bool: - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - @pytest.mark.slow @pytest.mark.postgres @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) def test_main_staging_to_silver_postgres(tmp_path): + dst_port = ports_dest["postgres"] + # Start Postgres with silver schema initialized from GGM DDL engine = get_connection( db_type="postgres", db_name="ggm_main_staging_to_silver", user="sa", password="S3cureP@ssw0rd!23243", - port=5434, + port=dst_port, force_refresh=True, sql_folder="./ggm_selectie/cssd", sql_suffix_filter=True, @@ -123,13 +111,13 @@ def test_main_staging_to_silver_postgres(tmp_path): # Create config for staging_to_silver.main cfg_path = tmp_path / "staging_to_silver.ini" cfg_path.write_text( - """ + f""" [database-destination] DST_DRIVER=postgresql+psycopg2 DST_USERNAME=sa DST_PASSWORD=S3cureP@ssw0rd!23243 DST_HOST=localhost -DST_PORT=5434 +DST_PORT={dst_port} DST_DB=ggm_main_staging_to_silver DST_SCHEMA=staging diff --git a/staging_to_silver/tests/test_staging_to_silver_mssql_crossdb.py b/staging_to_silver/tests/test_staging_to_silver_mssql_crossdb.py index cc6195c..ee8dcbb 100644 --- a/staging_to_silver/tests/test_staging_to_silver_mssql_crossdb.py +++ b/staging_to_silver/tests/test_staging_to_silver_mssql_crossdb.py @@ -7,52 +7,31 @@ from dev_sql_server.get_connection import get_connection from staging_to_silver.functions.schema_qualifier import qualify_schema - - -def _docker_running() -> bool: - import subprocess - - try: - res = subprocess.run( - ["docker", "info"], capture_output=True, text=True, timeout=5 - ) - return res.returncode == 0 - except Exception: - return False - - -def _slow_tests_enabled() -> bool: - import os - - return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} - - -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa: F401 - - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False +from tests.integration_utils import ( + docker_running, + mssql_driver_available, + slow_tests_enabled, +) +from tests.integration_utils import port_for_worker @pytest.mark.slow @pytest.mark.mssql @pytest.mark.skipif( - not _slow_tests_enabled(), + not slow_tests_enabled(), reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.", ) @pytest.mark.skipif( - not _docker_running(), + not docker_running(), reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) def test_mssql_crossdb_init_and_load(tmp_path: Path): # Use one SQL Server container (single instance) and create two databases in it - port = 1437 + port = port_for_worker(1437) user = "sa" password = "S3cureP@ssw0rd!23243" diff --git a/tests/integration_utils.py b/tests/integration_utils.py index b156def..725ff2d 100644 --- a/tests/integration_utils.py +++ b/tests/integration_utils.py @@ -15,12 +15,13 @@ import os import shutil import subprocess +import re from datetime import date, datetime, time from typing import Iterable import pytest import docker -from sqlalchemy import text +from sqlalchemy import MetaData, Table, text from dev_sql_server.get_connection import get_connection @@ -43,11 +44,46 @@ } +def _xdist_worker_index() -> int: + """Return the pytest-xdist worker index (gw0 -> 0), or 0 if not running under xdist.""" + wid = os.getenv("PYTEST_XDIST_WORKER", "").strip() + if not wid: + return 0 + m = re.match(r"^gw(\d+)$", wid) + if not m: + return 0 + try: + return int(m.group(1)) + except Exception: + return 0 + + +def port_for_worker(base_port: int) -> int: + """Shift a base port by worker index to avoid collisions under parallel runs. + + Controlled by env var `GGMPILOT_TEST_PORT_STRIDE` (default 50). + """ + stride = int(os.getenv("GGMPILOT_TEST_PORT_STRIDE", "50")) + return int(base_port) + _xdist_worker_index() * stride + + +# Rebind the exported port maps to worker-specific values. +ports = {k: port_for_worker(v) for k, v in ports.items()} +ports_dest = {k: port_for_worker(v) for k, v in ports_dest.items()} + + +_TEST_CONTAINER_NAME_RE = re.compile( + r"^(mariadb|mysql|postgres|oracle|mssql)-docker-db-\d+$" +) + + def docker_running() -> bool: if not shutil.which("docker"): return False try: - res = subprocess.run(["docker", "info"], capture_output=True, text=True, timeout=5) + res = subprocess.run( + ["docker", "info"], capture_output=True, text=True, timeout=5 + ) return res.returncode == 0 except Exception: return False @@ -57,6 +93,21 @@ def slow_tests_enabled() -> bool: return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"} +def mssql_driver_available( + required_driver: str = "ODBC Driver 18 for SQL Server", +) -> bool: + """Return True if the required SQL Server ODBC driver is installed. + + Kept here so MSSQL integration tests can share the same skip logic. + """ + try: + import pyodbc # type: ignore + + return any(required_driver in d for d in pyodbc.drivers()) + except Exception: + return False + + def cleanup_db_container_by_port(db_type: str, port: int): """Stop and remove a specific container and volume for a db_type and port. @@ -102,6 +153,38 @@ def cleanup_many(db_types: Iterable[str]): cleanup_db_containers(t) +def cleanup_all_test_db_containers() -> None: + """Best-effort cleanup of all DB containers created by test/dev helpers. + + Targets containers named like "{db_type}-docker-db-{port}" and their associated + volumes "{container_name}_data". + + This is primarily to keep CI clean and avoid port allocation conflicts across runs. + """ + client = docker.from_env() + + for c in client.containers.list(all=True): + name = (getattr(c, "name", None) or "").strip() + if not _TEST_CONTAINER_NAME_RE.match(name): + continue + + try: + c.stop(timeout=10) + except Exception: + pass + try: + c.remove(force=True) + except Exception: + pass + + vol_name = f"{name}_data" + try: + v = client.volumes.get(vol_name) + v.remove(force=True) + except Exception: + pass + + def schemas_for(db_type: str): if db_type == "oracle": return None, "sa" # source_schema, dest_schema @@ -140,7 +223,9 @@ def create_table_and_data(conn, db_type: str, table: str): ) """ elif db_type == "mssql": - conn.execute(text(f"IF OBJECT_ID(N'{table}', N'U') IS NOT NULL DROP TABLE {table}")) + conn.execute( + text(f"IF OBJECT_ID(N'{table}', N'U') IS NOT NULL DROP TABLE {table}") + ) ddl = f""" CREATE TABLE {table} ( id INT PRIMARY KEY, @@ -352,6 +437,54 @@ def host_for_docker() -> str: return "host.docker.internal" if os.getenv("IN_DOCKER", "0") == "1" else "localhost" +def insert_from_select_case_insensitive( + engine, + target_schema: str, + dest_table_name: str, + select_stmt, +): + """Insert rows from a SQLAlchemy select into an existing table. + + - Reflects the destination table from the database. + - Matches destination columns to the select's labeled column order. + - Matches destination columns case-insensitively for cross-dialect stability. + """ + md = MetaData() + try: + dest_table = Table( + dest_table_name, + md, + schema=target_schema, + autoload_with=engine, + extend_existing=True, + ) + except Exception: + dest_table = Table( + dest_table_name.lower(), + md, + schema=target_schema, + autoload_with=engine, + extend_existing=True, + ) + + select_col_order = [c.name for c in select_stmt.selected_columns] + dest_cols_map_ci = {c.name.lower(): c for c in dest_table.columns} + dest_cols = [] + for col_name in select_col_order: + try: + dest_cols.append(dest_table.columns[col_name]) + except KeyError: + ci = dest_cols_map_ci.get(col_name.lower()) + if ci is None: + raise KeyError( + f"Destination column '{col_name}' not found in table {dest_table.fullname}." + ) + dest_cols.append(ci) + + with engine.begin() as conn: + conn.execute(dest_table.insert().from_select(dest_cols, select_stmt)) + + @pytest.fixture(scope="session") def oracle_source_engine(): """Session-scoped Oracle source engine (starts container once).""" diff --git a/utils/tests/test_integration_execute_sql_folder.py b/utils/tests/test_integration_execute_sql_folder.py index f5b8371..9bbd978 100644 --- a/utils/tests/test_integration_execute_sql_folder.py +++ b/utils/tests/test_integration_execute_sql_folder.py @@ -7,21 +7,13 @@ from utils.database.execute_sql_folder import execute_sql_folder, drop_schema_objects from tests.integration_utils import ( docker_running, + mssql_driver_available, slow_tests_enabled, cleanup_db_container_by_port, cleanup_db_containers, ) -def _mssql_driver_available() -> bool: - try: - import pyodbc # noqa - - return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers()) - except Exception: - return False - - @pytest.mark.slow @pytest.mark.skipif( not slow_tests_enabled(), @@ -106,7 +98,7 @@ def test_execute_sql_and_delete_schema_postgres(tmp_path: Path): reason="Docker is not available/running; required for this integration test.", ) @pytest.mark.skipif( - not _mssql_driver_available(), + not mssql_driver_available(), reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.", ) @pytest.mark.mssql