diff --git a/config/iceberg.yaml b/config/iceberg.yaml index ae4a4e0..5ada695 100644 --- a/config/iceberg.yaml +++ b/config/iceberg.yaml @@ -1,8 +1,18 @@ iceberg: + catalog_name: agentflow catalog_type: rest - catalog_uri: http://localhost:8181 - warehouse: /tmp/warehouse + catalog_uri: ${AGENTFLOW_ICEBERG_URI:-http://localhost:8181} + # Object-store backed (MinIO/S3), matching docker-compose.iceberg.yml and the + # agentflow-lake bucket used by the Flink stack — not an ephemeral local dir. + warehouse: ${AGENTFLOW_ICEBERG_WAREHOUSE:-s3://agentflow-lake/warehouse} namespace: agentflow + # FileIO credentials for the PyIceberg client. Env-overridable; defaults match + # the local MinIO compose. Production injects real S3/Glue credentials here. + catalog_properties: + s3.endpoint: ${AGENTFLOW_S3_ENDPOINT:-http://localhost:9000} + s3.access-key-id: ${AGENTFLOW_S3_ACCESS_KEY:-minio} + s3.secret-access-key: ${AGENTFLOW_S3_SECRET_KEY:-minio123} + s3.region: ${AGENTFLOW_S3_REGION:-us-east-1} tables: - name: orders partition_by: [days(created_at)] diff --git a/docker-compose.iceberg.yml b/docker-compose.iceberg.yml index 313fbea..9207e6f 100644 --- a/docker-compose.iceberg.yml +++ b/docker-compose.iceberg.yml @@ -1,8 +1,52 @@ +# Focused Iceberg REST catalog stack, backed by MinIO (S3) instead of an +# ephemeral local directory. Mirrors the object-store wiring in +# docker-compose.yml (same image tags, bucket, credentials) so the PyIceberg +# sink writes table data/metadata to the real object store the rest of the +# stack already uses. Consumed by tests/integration/test_iceberg_sink.py. services: + minio: + image: minio/minio:RELEASE.2025-09-07T16-13-09Z + ports: + - "9000:9000" + - "9001:9001" + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: minio123 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 5s + timeout: 5s + retries: 5 + + # One-shot bucket provisioner. Retries the alias instead of gating on the + # MinIO healthcheck so the stack comes up regardless of whether the server + # image ships curl. + minio-init: + image: minio/mc:RELEASE.2025-08-13T08-35-41Z + depends_on: + - minio + entrypoint: ["/bin/bash", "-c"] + command: + - | + until mc alias set local http://minio:9000 minio minio123; do + echo "waiting for minio..."; sleep 1; + done + mc mb local/agentflow-lake --ignore-existing + echo "MinIO bucket ready." + iceberg-rest: image: tabulario/iceberg-rest:0.6.0 + depends_on: + minio-init: + condition: service_completed_successfully ports: - "8181:8181" environment: - CATALOG_WAREHOUSE: /tmp/warehouse - CATALOG_IO__IMPL: org.apache.iceberg.hadoop.HadoopFileIO + CATALOG_WAREHOUSE: s3://agentflow-lake/warehouse + CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO + CATALOG_S3_ENDPOINT: http://minio:9000 + CATALOG_S3_PATH__STYLE__ACCESS: "true" + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_REGION: us-east-1 diff --git a/docs/architecture.md b/docs/architecture.md index 697b197..06e089e 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -57,7 +57,7 @@ Same pipeline logic, no infrastructure dependencies: 3. **Enrich**: Domain enrichment per event type (order sizing, click classification, payment risk) 4. **Store**: Validated events written to DuckDB for serving and to Iceberg via PyIceberg 5. **Serve**: Agent API reads from DuckDB while `/v1/health` reports Iceberg row counts -6. **Catalog**: Development uses the local REST catalog from `docker-compose.iceberg.yml`; production uses AWS Glue +6. **Catalog**: Development uses a MinIO-backed REST catalog from `docker-compose.iceberg.yml` (writing to the same `agentflow-lake` S3 object store as the Flink stack); production uses AWS Glue Both paths use the **same validator and enrichment code** (`src/quality/`, `src/processing/transformations/`). diff --git a/src/processing/iceberg_sink.py b/src/processing/iceberg_sink.py index 06b0c52..9128544 100644 --- a/src/processing/iceberg_sink.py +++ b/src/processing/iceberg_sink.py @@ -2,6 +2,7 @@ import json import os +import re from datetime import UTC, datetime from pathlib import Path from typing import Any @@ -21,6 +22,23 @@ TimestampType, ) +# Matches ${VAR} and ${VAR:-default} so config values can reference the +# environment (e.g. S3 credentials) with a local fallback. A value without +# "${" is returned untouched. +_ENV_REF = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-([^}]*))?\}") + + +def _expand_env(value: Any) -> Any: + if not isinstance(value, str) or "${" not in value: + return value + + def _replace(match: re.Match[str]) -> str: + name, default = match.group(1), match.group(2) + return os.environ.get(name, default if default is not None else "") + + return _ENV_REF.sub(_replace, value) + + ORDERS_SCHEMA = Schema( NestedField(field_id=1, name="event_id", field_type=StringType(), required=True), NestedField(field_id=2, name="event_type", field_type=StringType(), required=True), @@ -103,13 +121,16 @@ def __init__(self, config_path: str | Path = "config/iceberg.yaml"): catalog_type = self._config["catalog_type"] catalog_properties = { "type": catalog_type, - "uri": self._resolve_catalog_uri(self._config["catalog_uri"]), + "uri": self._resolve_catalog_uri(_expand_env(self._config["catalog_uri"])), "warehouse": self._resolve_warehouse( - self._config["warehouse"], + _expand_env(self._config["warehouse"]), catalog_type, ), } - catalog_properties.update(self._config.get("catalog_properties", {})) + extra_properties = self._config.get("catalog_properties", {}) + catalog_properties.update( + {key: _expand_env(value) for key, value in extra_properties.items()} + ) self.catalog = load_catalog( self._config.get("catalog_name", "agentflow"), **catalog_properties, diff --git a/tests/integration/test_iceberg_sink.py b/tests/integration/test_iceberg_sink.py index cdd54ec..c6295e1 100644 --- a/tests/integration/test_iceberg_sink.py +++ b/tests/integration/test_iceberg_sink.py @@ -4,6 +4,7 @@ import sys import time from pathlib import Path +from typing import Any from uuid import uuid4 import duckdb @@ -25,28 +26,30 @@ def _write_iceberg_config( catalog_uri: str | None = None, warehouse: str | None = None, namespace: str = "agentflow", + catalog_properties: dict[str, str] | None = None, ) -> Path: path.parent.mkdir(parents=True, exist_ok=True) catalog_path = path.parent.parent / "catalog.db" resolved_catalog_uri = catalog_uri or f"sqlite:///{catalog_path.as_posix()}" resolved_warehouse = warehouse or "../warehouse" + iceberg_config: dict[str, Any] = { + "catalog_type": catalog_type, + "catalog_uri": resolved_catalog_uri, + "warehouse": resolved_warehouse, + "namespace": namespace, + "tables": [ + {"name": "orders", "partition_by": ["days(created_at)"]}, + {"name": "payments", "partition_by": ["days(created_at)"]}, + {"name": "clickstream", "partition_by": ["hours(created_at)"]}, + {"name": "inventory", "partition_by": ["days(created_at)"]}, + {"name": "dead_letter", "partition_by": ["days(received_at)"]}, + ], + } + if catalog_properties is not None: + iceberg_config["catalog_properties"] = catalog_properties path.write_text( yaml.safe_dump( - { - "iceberg": { - "catalog_type": catalog_type, - "catalog_uri": resolved_catalog_uri, - "warehouse": resolved_warehouse, - "namespace": namespace, - "tables": [ - {"name": "orders", "partition_by": ["days(created_at)"]}, - {"name": "payments", "partition_by": ["days(created_at)"]}, - {"name": "clickstream", "partition_by": ["hours(created_at)"]}, - {"name": "inventory", "partition_by": ["days(created_at)"]}, - {"name": "dead_letter", "partition_by": ["days(received_at)"]}, - ], - } - }, + {"iceberg": iceberg_config}, sort_keys=False, ), encoding="utf-8", @@ -247,8 +250,14 @@ def test_repo_default_config_writes_to_rest_catalog( tmp_path / "config" / "rest.yaml", catalog_type="rest", catalog_uri="http://localhost:8181", - warehouse="/tmp/warehouse", # noqa: S108 + warehouse="s3://agentflow-lake/warehouse", namespace=namespace, + catalog_properties={ + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "minio", + "s3.secret-access-key": "minio123", + "s3.region": "us-east-1", + }, ) _wait_for_catalog(rest_config) diff --git a/tests/unit/test_iceberg_sink.py b/tests/unit/test_iceberg_sink.py index a509e95..274871d 100644 --- a/tests/unit/test_iceberg_sink.py +++ b/tests/unit/test_iceberg_sink.py @@ -5,7 +5,7 @@ import yaml -from src.processing.iceberg_sink import IcebergSink +from src.processing.iceberg_sink import IcebergSink, _expand_env def test_rest_catalog_uses_warehouse_identifier_without_local_mkdir( @@ -111,3 +111,88 @@ def fake_load_catalog(name: str, **kwargs: str) -> FakeCatalog: "warehouse": warehouse_path, } assert (tmp_path / "warehouse").is_dir() + + +def test_expand_env_resolves_default_env_and_plain_values(monkeypatch) -> None: + monkeypatch.delenv("ICEBERG_TEST_VAR", raising=False) + assert _expand_env("${ICEBERG_TEST_VAR:-fallback}") == "fallback" + monkeypatch.setenv("ICEBERG_TEST_VAR", "from-env") + assert _expand_env("${ICEBERG_TEST_VAR:-fallback}") == "from-env" + assert _expand_env("${ICEBERG_TEST_VAR}") == "from-env" + # No reference and non-strings pass through untouched. + assert _expand_env("s3://agentflow-lake/warehouse") == "s3://agentflow-lake/warehouse" + assert _expand_env(True) is True + + +def test_s3_rest_catalog_expands_env_credentials_without_local_mkdir( + tmp_path: Path, + monkeypatch, +) -> None: + config_path = tmp_path / "config" / "iceberg.yaml" + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text( + yaml.safe_dump( + { + "iceberg": { + "catalog_type": "rest", + "catalog_uri": "${AGENTFLOW_ICEBERG_URI:-http://localhost:8181}", + "warehouse": "${AGENTFLOW_ICEBERG_WAREHOUSE:-s3://agentflow-lake/warehouse}", + "namespace": "agentflow", + "catalog_properties": { + "s3.endpoint": "${AGENTFLOW_S3_ENDPOINT:-http://localhost:9000}", + "s3.access-key-id": "${AGENTFLOW_S3_ACCESS_KEY:-minio}", + "s3.secret-access-key": "${AGENTFLOW_S3_SECRET_KEY:-minio123}", + "s3.region": "${AGENTFLOW_S3_REGION:-us-east-1}", + }, + "tables": [], + } + }, + sort_keys=False, + ), + encoding="utf-8", + newline="\n", + ) + + for name in ( + "AGENTFLOW_ICEBERG_URI", + "AGENTFLOW_ICEBERG_WAREHOUSE", + "AGENTFLOW_S3_ENDPOINT", + "AGENTFLOW_S3_ACCESS_KEY", + "AGENTFLOW_S3_SECRET_KEY", + "AGENTFLOW_S3_REGION", + ): + monkeypatch.delenv(name, raising=False) + # Production-style override is honoured. + monkeypatch.setenv("AGENTFLOW_S3_ENDPOINT", "https://s3.example.com") + + captured: dict[str, object] = {} + mkdir_calls: list[Path] = [] + + class FakeCatalog: + def create_namespace_if_not_exists(self, namespace: str) -> None: + captured["namespace"] = namespace + + def fake_load_catalog(name: str, **kwargs: str) -> FakeCatalog: + captured["name"] = name + captured["kwargs"] = kwargs + return FakeCatalog() + + def fake_mkdir(self: Path, parents: bool = False, exist_ok: bool = False) -> None: + mkdir_calls.append(self) + + monkeypatch.setattr("src.processing.iceberg_sink.load_catalog", fake_load_catalog) + monkeypatch.setattr(Path, "mkdir", fake_mkdir) + + IcebergSink(config_path=config_path) + + assert captured["kwargs"] == { + "type": "rest", + "uri": "http://localhost:8181", + "warehouse": "s3://agentflow-lake/warehouse", + "s3.endpoint": "https://s3.example.com", + "s3.access-key-id": "minio", + "s3.secret-access-key": "minio123", + "s3.region": "us-east-1", + } + # An s3:// warehouse must never create a local directory. + assert mkdir_calls == []