Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/staging-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: Staging Deploy
on:
push:
branches: [main]
workflow_dispatch:

permissions:
contents: read
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ services:
AGENTFLOW_RATE_LIMIT_RPM: ${AGENTFLOW_RATE_LIMIT_RPM:-120}
AGENTFLOW_USAGE_DB_PATH: /app/data/agentflow_api_usage.duckdb
AGENTFLOW_WEBHOOKS_FILE: /app/data/e2e-webhooks.yaml
# The e2e webhook callback is delivered to the host gateway, which the
# SSRF egress guard rejects as a private address. Trust exactly that
# configured callback host (default host.docker.internal) — the guard
# stays fully active for every other target.
AGENTFLOW_EGRESS_ALLOWED_HOSTS: ${AGENTFLOW_E2E_CALLBACK_HOST:-host.docker.internal}
DUCKDB_PATH: /app/data/agentflow.duckdb
REDIS_URL: redis://redis:6379/0
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
Expand Down
8 changes: 7 additions & 1 deletion scripts/k8s_staging_up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,17 @@ helm upgrade --install "$RELEASE_NAME" "$ROOT_DIR/helm/agentflow" \
--debug

echo "==> Enabling host loopback relay for webhook callbacks..."
# The relay listens on 127.0.0.1 inside the pod and forwards to the host
# gateway, so the E2E webhook callback URL is http://127.0.0.1:<port>/callback.
# The SSRF egress guard rejects 127.0.0.1 as loopback by default; allowlist
# exactly the relay loopback here (ephemeral staging only) so the webhook
# delivery test passes without weakening the guard for real targets.
kubectl set env "deployment/$RELEASE_NAME" \
--namespace "$NAMESPACE" \
HOST_LOOPBACK_PROXY_TARGET="$HOST_LOOPBACK_PROXY_TARGET" \
HOST_LOOPBACK_PROXY_RANGE_START="$HOST_LOOPBACK_PROXY_RANGE_START" \
HOST_LOOPBACK_PROXY_RANGE_END="$HOST_LOOPBACK_PROXY_RANGE_END"
HOST_LOOPBACK_PROXY_RANGE_END="$HOST_LOOPBACK_PROXY_RANGE_END" \
AGENTFLOW_EGRESS_ALLOWED_HOSTS="127.0.0.1"

kubectl patch deployment "$RELEASE_NAME" \
--namespace "$NAMESPACE" \
Expand Down
23 changes: 23 additions & 0 deletions src/serving/api/egress_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,33 @@
registration time (reject early, 4xx) and immediately before each delivery
(narrowing the DNS-rebinding window — a name that resolved public at creation
could later point at an internal IP).

``AGENTFLOW_EGRESS_ALLOWED_HOSTS`` is an opt-in escape hatch (default empty, so
production keeps the full guard): a comma-separated allowlist of exact
hostnames a controlled deployment trusts even though they resolve to a
private/loopback address — e.g. the ``host.docker.internal`` gateway the e2e
compose stack delivers to, or the ``127.0.0.1`` relay the staging kind cluster
stands up. It never relaxes the guard for tenant traffic, only for hosts the
operator explicitly listed.
"""

from __future__ import annotations

import ipaddress
import os
import socket
from urllib.parse import urlsplit

_ALLOWED_SCHEMES = {"http", "https"}
_ALLOWLIST_ENV = "AGENTFLOW_EGRESS_ALLOWED_HOSTS"


def _allowed_hosts() -> frozenset[str]:
"""Return the operator-configured opt-in allowlist (exact hostnames,
lower-cased). Read per call so deployments can set it via the environment
without an import-time freeze; empty by default."""
raw = os.getenv(_ALLOWLIST_ENV, "")
return frozenset(host.strip().lower() for host in raw.split(",") if host.strip())


class UnsafeEgressURLError(ValueError):
Expand Down Expand Up @@ -53,6 +71,11 @@ def validate_public_url(url: str) -> None:
host = parts.hostname
if not host:
raise UnsafeEgressURLError("missing host")
if host.lower() in _allowed_hosts():
# Operator explicitly trusts this host (controlled test/relay target).
# The scheme check above still applies; only the public-address check
# is waived.
return
port = parts.port or (443 if scheme == "https" else 80)
try:
infos = socket.getaddrinfo(host, port, proto=socket.IPPROTO_TCP)
Expand Down
8 changes: 8 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ def _write_compose_override(path: Path, host_port: int) -> None:
' AGENTFLOW_RATE_LIMIT_RPM: "120"',
" AGENTFLOW_USAGE_DB_PATH: /app/data/agentflow_api_usage.duckdb",
" AGENTFLOW_WEBHOOKS_FILE: /app/data/e2e-webhooks.yaml",
# Trust the configured callback host so the SSRF egress guard permits
# the webhook delivery to the host gateway (it resolves to a private
# address). Tracks _compose_callback_host(): host.docker.internal on
# Linux, host.lima.internal on macOS, or an explicit override.
f' AGENTFLOW_EGRESS_ALLOWED_HOSTS: "{_compose_callback_host()}"',
' OTEL_SDK_DISABLED: "true"',
" ports:",
f' - "127.0.0.1:{host_port}:8000"',
Expand Down Expand Up @@ -220,6 +225,9 @@ def _start_local_api(tmp_path: Path) -> dict[str, object]:
"AGENTFLOW_USAGE_DB_PATH": str(tmp_path / "agentflow_usage.duckdb"),
"AGENTFLOW_WEBHOOKS_FILE": str(tmp_path / "webhooks.yaml"),
"DUCKDB_PATH": str(tmp_path / "agentflow.duckdb"),
# Local mode delivers the webhook callback to 127.0.0.1 (receiver shares
# this host); allowlist it so the SSRF egress guard permits the loopback.
"AGENTFLOW_EGRESS_ALLOWED_HOSTS": "127.0.0.1",
"PYTHONUNBUFFERED": "1",
}

Expand Down
14 changes: 14 additions & 0 deletions tests/e2e/test_ci_compose_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ def test_compose_override_uses_native_host_dns_on_darwin(tmp_path, monkeypatch):
assert "extra_hosts" not in override["services"]["agentflow-api"]


def test_compose_override_allowlists_callback_host_for_egress_guard(tmp_path, monkeypatch):
conftest_module = _load_module(PROJECT_ROOT / "tests" / "e2e" / "conftest.py", "e2e_conftest")
monkeypatch.delenv("AGENTFLOW_E2E_CALLBACK_HOST", raising=False)
monkeypatch.setattr(conftest_module.platform, "system", lambda: "Linux")

override_path = tmp_path / "docker-compose.e2e.override.yml"
conftest_module._write_compose_override(override_path, 18080)

override = yaml.safe_load(override_path.read_text(encoding="utf-8"))
environment = override["services"]["agentflow-api"]["environment"]

assert environment["AGENTFLOW_EGRESS_ALLOWED_HOSTS"] == "host.docker.internal"


def test_compose_callback_host_uses_linux_docker_alias(monkeypatch):
conftest_module = _load_module(PROJECT_ROOT / "tests" / "e2e" / "conftest.py", "e2e_conftest")
monkeypatch.delenv("AGENTFLOW_E2E_CALLBACK_HOST", raising=False)
Expand Down
58 changes: 58 additions & 0 deletions tests/unit/test_egress_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,61 @@ def test_validate_rejects_when_any_resolved_ip_is_private(
)
with pytest.raises(UnsafeEgressURLError):
validate_public_url("http://dual.example.com/")


# --- Opt-in egress allowlist (AGENTFLOW_EGRESS_ALLOWED_HOSTS) -----------------
# A controlled deployment (e2e compose, staging kind relay) must be able to
# deliver to the specific callback host it stands up — which deliberately
# resolves to a private/loopback address — without weakening the guard for
# tenant traffic. The allowlist is opt-in (default empty) and matches exact
# hostnames, case-insensitively, while still enforcing the http(s) scheme.


def test_allowlisted_loopback_host_is_permitted(monkeypatch: pytest.MonkeyPatch) -> None:
# Staging shape: the in-pod relay listens on 127.0.0.1 (normally rejected as
# loopback); the operator allowlists it so the webhook callback is delivered.
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", "127.0.0.1")
validate_public_url("http://127.0.0.1:18080/callback") # must not raise


def test_allowlisted_gateway_host_skips_resolution(monkeypatch: pytest.MonkeyPatch) -> None:
# E2E shape: host.docker.internal resolves to a private gateway IP. An
# allowlisted host is trusted without any DNS resolution at all.
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", "host.docker.internal")

def _no_resolution(*_a: object, **_k: object) -> list[object]:
raise AssertionError("allowlisted host must not be resolved")

monkeypatch.setattr(socket, "getaddrinfo", _no_resolution)
validate_public_url("http://host.docker.internal:9000/callback") # must not raise


def test_allowlist_matches_host_case_insensitively(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", "Host.Docker.Internal")
validate_public_url("http://host.docker.internal/cb") # must not raise


def test_allowlist_parses_comma_list_and_ignores_blanks(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", " , 127.0.0.1 ,host.docker.internal, ")
validate_public_url("http://127.0.0.1:18080/callback") # must not raise
validate_public_url("http://host.docker.internal/cb") # must not raise


def test_allowlist_still_enforces_scheme(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", "127.0.0.1")
with pytest.raises(UnsafeEgressURLError):
validate_public_url("file://127.0.0.1/etc/passwd")


def test_allowlist_only_exempts_listed_hosts(monkeypatch: pytest.MonkeyPatch) -> None:
# A different private host, not on the list, is still rejected.
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", "127.0.0.1")
with pytest.raises(UnsafeEgressURLError):
validate_public_url("http://10.0.0.5:6379/")


def test_empty_allowlist_preserves_loopback_rejection(monkeypatch: pytest.MonkeyPatch) -> None:
# Default/empty env must not weaken the guard.
monkeypatch.setenv("AGENTFLOW_EGRESS_ALLOWED_HOSTS", "")
with pytest.raises(UnsafeEgressURLError):
validate_public_url("http://127.0.0.1/x")