Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions config/iceberg.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
iceberg:
catalog_name: agentflow
catalog_type: rest
catalog_uri: http://localhost:8181
warehouse: /tmp/warehouse
catalog_uri: ${AGENTFLOW_ICEBERG_URI:-http://localhost:8181}
# Object-store backed (MinIO/S3), matching docker-compose.iceberg.yml and the
# agentflow-lake bucket used by the Flink stack — not an ephemeral local dir.
warehouse: ${AGENTFLOW_ICEBERG_WAREHOUSE:-s3://agentflow-lake/warehouse}
namespace: agentflow
# FileIO credentials for the PyIceberg client. Env-overridable; defaults match
# the local MinIO compose. Production injects real S3/Glue credentials here.
catalog_properties:
s3.endpoint: ${AGENTFLOW_S3_ENDPOINT:-http://localhost:9000}
s3.access-key-id: ${AGENTFLOW_S3_ACCESS_KEY:-minio}
s3.secret-access-key: ${AGENTFLOW_S3_SECRET_KEY:-minio123}
s3.region: ${AGENTFLOW_S3_REGION:-us-east-1}
tables:
- name: orders
partition_by: [days(created_at)]
Expand Down
48 changes: 46 additions & 2 deletions docker-compose.iceberg.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,52 @@
# Focused Iceberg REST catalog stack, backed by MinIO (S3) instead of an
# ephemeral local directory. Mirrors the object-store wiring in
# docker-compose.yml (same image tags, bucket, credentials) so the PyIceberg
# sink writes table data/metadata to the real object store the rest of the
# stack already uses. Consumed by tests/integration/test_iceberg_sink.py.
services:
minio:
image: minio/minio:RELEASE.2025-09-07T16-13-09Z
ports:
- "9000:9000"
- "9001:9001"
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 5s
timeout: 5s
retries: 5

# One-shot bucket provisioner. Retries the alias instead of gating on the
# MinIO healthcheck so the stack comes up regardless of whether the server
# image ships curl.
minio-init:
image: minio/mc:RELEASE.2025-08-13T08-35-41Z
depends_on:
- minio
entrypoint: ["/bin/bash", "-c"]
command:
- |
until mc alias set local http://minio:9000 minio minio123; do
echo "waiting for minio..."; sleep 1;
done
mc mb local/agentflow-lake --ignore-existing
echo "MinIO bucket ready."

iceberg-rest:
image: tabulario/iceberg-rest:0.6.0
depends_on:
minio-init:
condition: service_completed_successfully
ports:
- "8181:8181"
environment:
CATALOG_WAREHOUSE: /tmp/warehouse
CATALOG_IO__IMPL: org.apache.iceberg.hadoop.HadoopFileIO
CATALOG_WAREHOUSE: s3://agentflow-lake/warehouse
CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
CATALOG_S3_ENDPOINT: http://minio:9000
CATALOG_S3_PATH__STYLE__ACCESS: "true"
AWS_ACCESS_KEY_ID: minio
AWS_SECRET_ACCESS_KEY: minio123
AWS_REGION: us-east-1
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Same pipeline logic, no infrastructure dependencies:
3. **Enrich**: Domain enrichment per event type (order sizing, click classification, payment risk)
4. **Store**: Validated events written to DuckDB for serving and to Iceberg via PyIceberg
5. **Serve**: Agent API reads from DuckDB while `/v1/health` reports Iceberg row counts
6. **Catalog**: Development uses the local REST catalog from `docker-compose.iceberg.yml`; production uses AWS Glue
6. **Catalog**: Development uses a MinIO-backed REST catalog from `docker-compose.iceberg.yml` (writing to the same `agentflow-lake` S3 object store as the Flink stack); production uses AWS Glue

Both paths use the **same validator and enrichment code** (`src/quality/`, `src/processing/transformations/`).

Expand Down
27 changes: 24 additions & 3 deletions src/processing/iceberg_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import re
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
Expand All @@ -21,6 +22,23 @@
TimestampType,
)

# Matches ${VAR} and ${VAR:-default} so config values can reference the
# environment (e.g. S3 credentials) with a local fallback. A value without
# "${" is returned untouched.
_ENV_REF = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-([^}]*))?\}")


def _expand_env(value: Any) -> Any:
if not isinstance(value, str) or "${" not in value:
return value

def _replace(match: re.Match[str]) -> str:
name, default = match.group(1), match.group(2)
return os.environ.get(name, default if default is not None else "")

return _ENV_REF.sub(_replace, value)


ORDERS_SCHEMA = Schema(
NestedField(field_id=1, name="event_id", field_type=StringType(), required=True),
NestedField(field_id=2, name="event_type", field_type=StringType(), required=True),
Expand Down Expand Up @@ -103,13 +121,16 @@ def __init__(self, config_path: str | Path = "config/iceberg.yaml"):
catalog_type = self._config["catalog_type"]
catalog_properties = {
"type": catalog_type,
"uri": self._resolve_catalog_uri(self._config["catalog_uri"]),
"uri": self._resolve_catalog_uri(_expand_env(self._config["catalog_uri"])),
"warehouse": self._resolve_warehouse(
self._config["warehouse"],
_expand_env(self._config["warehouse"]),
catalog_type,
),
}
catalog_properties.update(self._config.get("catalog_properties", {}))
extra_properties = self._config.get("catalog_properties", {})
catalog_properties.update(
{key: _expand_env(value) for key, value in extra_properties.items()}
)
self.catalog = load_catalog(
self._config.get("catalog_name", "agentflow"),
**catalog_properties,
Expand Down
41 changes: 25 additions & 16 deletions tests/integration/test_iceberg_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import time
from pathlib import Path
from typing import Any
from uuid import uuid4

import duckdb
Expand All @@ -25,28 +26,30 @@ def _write_iceberg_config(
catalog_uri: str | None = None,
warehouse: str | None = None,
namespace: str = "agentflow",
catalog_properties: dict[str, str] | None = None,
) -> Path:
path.parent.mkdir(parents=True, exist_ok=True)
catalog_path = path.parent.parent / "catalog.db"
resolved_catalog_uri = catalog_uri or f"sqlite:///{catalog_path.as_posix()}"
resolved_warehouse = warehouse or "../warehouse"
iceberg_config: dict[str, Any] = {
"catalog_type": catalog_type,
"catalog_uri": resolved_catalog_uri,
"warehouse": resolved_warehouse,
"namespace": namespace,
"tables": [
{"name": "orders", "partition_by": ["days(created_at)"]},
{"name": "payments", "partition_by": ["days(created_at)"]},
{"name": "clickstream", "partition_by": ["hours(created_at)"]},
{"name": "inventory", "partition_by": ["days(created_at)"]},
{"name": "dead_letter", "partition_by": ["days(received_at)"]},
],
}
if catalog_properties is not None:
iceberg_config["catalog_properties"] = catalog_properties
path.write_text(
yaml.safe_dump(
{
"iceberg": {
"catalog_type": catalog_type,
"catalog_uri": resolved_catalog_uri,
"warehouse": resolved_warehouse,
"namespace": namespace,
"tables": [
{"name": "orders", "partition_by": ["days(created_at)"]},
{"name": "payments", "partition_by": ["days(created_at)"]},
{"name": "clickstream", "partition_by": ["hours(created_at)"]},
{"name": "inventory", "partition_by": ["days(created_at)"]},
{"name": "dead_letter", "partition_by": ["days(received_at)"]},
],
}
},
{"iceberg": iceberg_config},
sort_keys=False,
),
encoding="utf-8",
Expand Down Expand Up @@ -247,8 +250,14 @@ def test_repo_default_config_writes_to_rest_catalog(
tmp_path / "config" / "rest.yaml",
catalog_type="rest",
catalog_uri="http://localhost:8181",
warehouse="/tmp/warehouse", # noqa: S108
warehouse="s3://agentflow-lake/warehouse",
namespace=namespace,
catalog_properties={
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minio",
"s3.secret-access-key": "minio123",
"s3.region": "us-east-1",
},
)

_wait_for_catalog(rest_config)
Expand Down
87 changes: 86 additions & 1 deletion tests/unit/test_iceberg_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import yaml

from src.processing.iceberg_sink import IcebergSink
from src.processing.iceberg_sink import IcebergSink, _expand_env


def test_rest_catalog_uses_warehouse_identifier_without_local_mkdir(
Expand Down Expand Up @@ -111,3 +111,88 @@ def fake_load_catalog(name: str, **kwargs: str) -> FakeCatalog:
"warehouse": warehouse_path,
}
assert (tmp_path / "warehouse").is_dir()


def test_expand_env_resolves_default_env_and_plain_values(monkeypatch) -> None:
monkeypatch.delenv("ICEBERG_TEST_VAR", raising=False)
assert _expand_env("${ICEBERG_TEST_VAR:-fallback}") == "fallback"
monkeypatch.setenv("ICEBERG_TEST_VAR", "from-env")
assert _expand_env("${ICEBERG_TEST_VAR:-fallback}") == "from-env"
assert _expand_env("${ICEBERG_TEST_VAR}") == "from-env"
# No reference and non-strings pass through untouched.
assert _expand_env("s3://agentflow-lake/warehouse") == "s3://agentflow-lake/warehouse"
assert _expand_env(True) is True


def test_s3_rest_catalog_expands_env_credentials_without_local_mkdir(
tmp_path: Path,
monkeypatch,
) -> None:
config_path = tmp_path / "config" / "iceberg.yaml"
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(
yaml.safe_dump(
{
"iceberg": {
"catalog_type": "rest",
"catalog_uri": "${AGENTFLOW_ICEBERG_URI:-http://localhost:8181}",
"warehouse": "${AGENTFLOW_ICEBERG_WAREHOUSE:-s3://agentflow-lake/warehouse}",
"namespace": "agentflow",
"catalog_properties": {
"s3.endpoint": "${AGENTFLOW_S3_ENDPOINT:-http://localhost:9000}",
"s3.access-key-id": "${AGENTFLOW_S3_ACCESS_KEY:-minio}",
"s3.secret-access-key": "${AGENTFLOW_S3_SECRET_KEY:-minio123}",
"s3.region": "${AGENTFLOW_S3_REGION:-us-east-1}",
},
"tables": [],
}
},
sort_keys=False,
),
encoding="utf-8",
newline="\n",
)

for name in (
"AGENTFLOW_ICEBERG_URI",
"AGENTFLOW_ICEBERG_WAREHOUSE",
"AGENTFLOW_S3_ENDPOINT",
"AGENTFLOW_S3_ACCESS_KEY",
"AGENTFLOW_S3_SECRET_KEY",
"AGENTFLOW_S3_REGION",
):
monkeypatch.delenv(name, raising=False)
# Production-style override is honoured.
monkeypatch.setenv("AGENTFLOW_S3_ENDPOINT", "https://s3.example.com")

captured: dict[str, object] = {}
mkdir_calls: list[Path] = []

class FakeCatalog:
def create_namespace_if_not_exists(self, namespace: str) -> None:
captured["namespace"] = namespace

def fake_load_catalog(name: str, **kwargs: str) -> FakeCatalog:
captured["name"] = name
captured["kwargs"] = kwargs
return FakeCatalog()

def fake_mkdir(self: Path, parents: bool = False, exist_ok: bool = False) -> None:
mkdir_calls.append(self)

monkeypatch.setattr("src.processing.iceberg_sink.load_catalog", fake_load_catalog)
monkeypatch.setattr(Path, "mkdir", fake_mkdir)

IcebergSink(config_path=config_path)

assert captured["kwargs"] == {
"type": "rest",
"uri": "http://localhost:8181",
"warehouse": "s3://agentflow-lake/warehouse",
"s3.endpoint": "https://s3.example.com",
"s3.access-key-id": "minio",
"s3.secret-access-key": "minio123",
"s3.region": "us-east-1",
}
# An s3:// warehouse must never create a local directory.
assert mkdir_calls == []