Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions aenv/e2e_arca_negative_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env python3
"""
Contract tests for arca + ``enable_data_plane=False``.

Round 2: every data-plane method on ``Environment`` raises
``EnvironmentError`` mentioning ``enable_data_plane=False`` instead of
silently hitting api-service:8081.

Round 3: api-service:8081 (the MCP gateway port) returns HTTP 501 with an
actionable message for any path when running in arca schedule mode. This
is the server-side guarantee that pairs with Round 2.

Both rounds talk to a real api-service-arca + arca sandbox, so the script
must run against tydd-staging (or another arca-mode deployment).

Env vars:
AENV_SYSTEM_URL api-service base URL on :8080 (required)
ARCA_TEST_ENV_NAME envhub env name (e.g. arca-real@1.0.0) (required)
AENV_API_KEY optional bearer token
"""

from __future__ import annotations

import asyncio
import json
import os
import sys
from urllib.parse import urlparse, urlunparse

import httpx

from aenv import Environment
from aenv.core.exceptions import EnvironmentError

CONTROL_URL = os.environ.get(
"AENV_SYSTEM_URL", os.environ.get("ARCA_API_SERVICE_URL", "")
)
ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "")
AENV_API_KEY = os.environ.get("AENV_API_KEY", "")

DATA_PLANE_METHODS: list[tuple[str, tuple]] = [
("call_tool", ("t", {})),
("list_tools", ()),
("list_functions", ()),
("call_reward", ({},)),
("check_health", ({},)),
("call_function", ("f", {})),
]


def _data_plane_url(control_url: str) -> str:
"""Swap the :8080 port in control_url for :8081 (the MCP gateway)."""
parsed = urlparse(control_url if "://" in control_url else f"http://{control_url}")
host = parsed.hostname or "127.0.0.1"
return urlunparse(parsed._replace(netloc=f"{host}:8081", path="", query=""))


async def round2_guards() -> bool:
print("=== Round 2: Environment guards under enable_data_plane=False ===")
async with Environment(
env_name=ENV_NAME,
aenv_url=CONTROL_URL,
api_key=AENV_API_KEY or None,
enable_data_plane=False,
ttl="5m",
) as env:
for name, args in DATA_PLANE_METHODS:
try:
await getattr(env, name)(*args)
except EnvironmentError as e:
if "enable_data_plane=False" not in str(e):
print(f" {name}: raised wrong message: {e}")
return False
print(f" {name}: blocked OK")
continue
print(f" {name}: did NOT raise -- BUG")
return False
print("Round 2 PASS")
return True


async def round3_501() -> bool:
print("=== Round 3: api-service:8081 returns 501 for arca data plane ===")
data_plane_base = _data_plane_url(CONTROL_URL)
async with Environment(
env_name=ENV_NAME,
aenv_url=CONTROL_URL,
api_key=AENV_API_KEY or None,
enable_data_plane=False,
ttl="5m",
) as env:
info = await env.get_env_info()
sandbox_id = info["instance_id"]
async with httpx.AsyncClient(timeout=10.0) as client:
for path in ["/health", "/mcp", "/some/random"]:
resp = await client.get(
f"{data_plane_base}{path}",
headers={"AEnvCore-EnvInstance-ID": sandbox_id},
)
try:
body = resp.json()
except Exception:
body = {"_raw": resp.text[:200]}

ok = (
resp.status_code == 501
and isinstance(body, dict)
and "data plane" in str(body.get("message", ""))
)
tag = "OK" if ok else "FAIL"
print(
f" {path}: status={resp.status_code} body={json.dumps(body)} [{tag}]"
)
if not ok:
return False
print("Round 3 PASS")
return True


async def main() -> int:
if not CONTROL_URL or not ENV_NAME:
print(
"missing required env vars AENV_SYSTEM_URL and/or ARCA_TEST_ENV_NAME",
file=sys.stderr,
)
return 2
r2 = await round2_guards()
r3 = await round3_501()
return 0 if (r2 and r3) else 1


if __name__ == "__main__":
sys.exit(asyncio.run(main()))
127 changes: 127 additions & 0 deletions aenv/e2e_arca_presign_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Example: aenv + arca engine — fast create with no data-plane wait, then
access the sandbox via a presigned URL (direct data-plane, bypassing
api-service).

When to use this pattern:
* Engine is arca and the sandbox image runs an arbitrary user app
(no aenv MCP server inside).
* You want minimum latency at create time and prefer to wait on the
in-sandbox app's own readiness path.

High-level flow:
1. create sandbox (Environment.__aenter__, no MCP session)
2. presign URL (Environment.presign_url)
3. caller polls the URL (httpx, business-defined readiness path)
4. caller does real work (whatever the sandbox process listens for)
5. release (Environment.__aexit__)

``enable_data_plane=False`` is what makes this work on arca: the SDK
skips the MCP session and the /health probe entirely, so no traffic
hits api-service:8081 (which on arca returns 501 by design). The
``call_tool`` / ``list_tools`` / ``call_function`` / ``call_reward`` /
``check_health`` methods will raise if invoked under this flag.

Env vars:
AENV_SYSTEM_URL api-service base URL (default http://localhost)
ARCA_TEST_ENV_NAME envhub env name (arcaTemplateId inside) (required)
AENV_API_KEY bearer token if api-service has auth enabled (optional)
ARCA_SERVICE_PORT in-sandbox port to expose (default 18080)
ARCA_SERVICE_PATH path to GET for readiness (default /healthz)
ARCA_PRESIGN_TTL_MIN presign URL ttl (default 5)
ARCA_READINESS_TIMEOUT readiness polling budget (default 45s)
"""

from __future__ import annotations

import asyncio
import os
import sys
import time

import httpx

from aenv import Environment

ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "")
AENV_URL = os.environ.get("AENV_SYSTEM_URL", os.environ.get("ARCA_API_SERVICE_URL", ""))
AENV_API_KEY = os.environ.get("AENV_API_KEY", "")

SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "18080"))
SERVICE_PATH = os.environ.get("ARCA_SERVICE_PATH", "/healthz")
PRESIGN_TTL_MIN = float(os.environ.get("ARCA_PRESIGN_TTL_MIN", "5"))
READINESS_TIMEOUT_S = float(os.environ.get("ARCA_READINESS_TIMEOUT", "45"))


async def wait_ready(
target: str, timeout_s: float = READINESS_TIMEOUT_S
) -> httpx.Response:
"""Poll GET target until 2xx or timeout. Returns the last response."""
deadline = time.time() + timeout_s
last: httpx.Response | None = None
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c:
attempt = 0
while time.time() < deadline:
attempt += 1
try:
last = await c.get(target)
if 200 <= last.status_code < 300:
return last
print(f" readiness attempt {attempt}: status={last.status_code}")
except Exception as e:
print(f" readiness attempt {attempt}: {type(e).__name__}: {e}")
await asyncio.sleep(2.0)
if last is None:
raise TimeoutError(f"never got a response within {timeout_s}s")
raise TimeoutError(
f"never became ready within {timeout_s}s "
f"(last status={last.status_code}, body={last.text[:200]!r})"
)


async def main() -> int:
if not ENV_NAME:
print("missing required env var ARCA_TEST_ENV_NAME", file=sys.stderr)
return 1

print(f"env_name: {ENV_NAME}")
print(f"aenv_url: {AENV_URL or '(default)'}")

t0 = time.time()
async with Environment(
env_name=ENV_NAME,
ttl="10m",
enable_data_plane=False,
) as env:
info = await env.get_env_info()
print(
f"[1] created sandbox in {time.time()-t0:.2f}s "
f"(id={info['instance_id']}, status={info['status']})"
)

url = await env.presign_url(
port=SERVICE_PORT,
expiration_time_in_minutes=PRESIGN_TTL_MIN,
)
print(f"[2] presigned URL ({SERVICE_PORT}): {url}")

target = url.rstrip("/") + (
SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH
)
print(f"[3] waiting for {SERVICE_PATH} to return 2xx...")
resp = await wait_ready(target)
print(f" ready in {time.time()-t0:.1f}s; status={resp.status_code}")
print(f" body: {resp.text[:200]!r}")

# The same ``url`` is a valid base for HTTP / WebSocket / anything
# the sandbox process listens for on that port. It remains valid
# until the presign TTL expires.
print("[4] (business traffic would run here)")

print(f"[5] released sandbox in {time.time()-t0:.1f}s total")
return 0


if __name__ == "__main__":
sys.exit(asyncio.run(main()))
87 changes: 87 additions & 0 deletions aenv/e2e_arca_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""
End-to-end smoke for api-service-arca.

Runs against the in-cluster service via kubectl port-forward.

Env:
ARCA_API_SERVICE_URL = http://localhost:<local-port> (default 18080)
ARCA_TEST_ENV_NAME = envhub env name (with @version) to create sandbox
from. If unset, only the proxy liveness probe runs.

The SDK is engine-unaware: this script never asserts on labels or NotImplementedError.
"""
from __future__ import annotations

import asyncio
import os
import sys
import time
import traceback

from aenv.core.environment import Environment

ARCA_URL = os.environ.get("ARCA_API_SERVICE_URL", "http://localhost:18080")
ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "")
API_KEY = os.environ.get("AENV_API_KEY", "")


def _fail(msg: str) -> None:
print(f"[FAIL] {msg}")
sys.exit(1)


def _ok(msg: str) -> None:
print(f"[ OK ] {msg}")


async def probe_health() -> None:
"""Plain HTTP liveness via httpx (proves port-forward works)."""
import httpx

async with httpx.AsyncClient(timeout=5.0) as c:
r = await c.get(f"{ARCA_URL}/health")
if r.status_code != 200:
_fail(f"/health returned {r.status_code}: {r.text}")
_ok("/health -> 200")


async def lifecycle() -> None:
if not ENV_NAME:
print("[SKIP] ARCA_TEST_ENV_NAME not set; skipping create/release")
return

env = Environment(
env_name=ENV_NAME,
aenv_url=ARCA_URL,
ttl="10m",
startup_timeout=180.0,
timeout=60.0,
max_retries=1,
api_key=API_KEY or None,
)

t0 = time.time()
try:
await env.initialize()
except Exception as e:
print("[FAIL] initialize raised")
traceback.print_exc()
_fail(f"create failed: {e!r}")

_ok(f"initialize ok in {time.time()-t0:.1f}s, instance={env._instance}")

if not env._instance:
_fail("env._instance is None after initialize")

await env.release()
_ok("release ok")


async def main() -> None:
await probe_health()
await lifecycle()


if __name__ == "__main__":
asyncio.run(main())
5 changes: 4 additions & 1 deletion aenv/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ dependencies = [
"typer>=0.9.0",
"tabulate>=0.9.0",
"colorlog>=6.10.1",
"openai-agents>=0.6.3",
"starlette>=0.27.0",
"urllib3>=1.26.0",
"docker>=6.0.0",
]

[project.optional-dependencies]
agents = [
"openai-agents>=0.6.3",
]

dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
Expand Down
Loading
Loading