diff --git a/aenv/e2e_arca_negative_test.py b/aenv/e2e_arca_negative_test.py new file mode 100644 index 0000000..0b55a7f --- /dev/null +++ b/aenv/e2e_arca_negative_test.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +""" +Contract tests for arca + ``enable_data_plane=False``. + +Round 2: every data-plane method on ``Environment`` raises +``EnvironmentError`` mentioning ``enable_data_plane=False`` instead of +silently hitting api-service:8081. + +Round 3: api-service:8081 (the MCP gateway port) returns HTTP 501 with an +actionable message for any path when running in arca schedule mode. This +is the server-side guarantee that pairs with Round 2. + +Both rounds talk to a real api-service-arca + arca sandbox, so the script +must run against tydd-staging (or another arca-mode deployment). + +Env vars: + AENV_SYSTEM_URL api-service base URL on :8080 (required) + ARCA_TEST_ENV_NAME envhub env name (e.g. arca-real@1.0.0) (required) + AENV_API_KEY optional bearer token +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +from urllib.parse import urlparse, urlunparse + +import httpx + +from aenv import Environment +from aenv.core.exceptions import EnvironmentError + +CONTROL_URL = os.environ.get( + "AENV_SYSTEM_URL", os.environ.get("ARCA_API_SERVICE_URL", "") +) +ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +AENV_API_KEY = os.environ.get("AENV_API_KEY", "") + +DATA_PLANE_METHODS: list[tuple[str, tuple]] = [ + ("call_tool", ("t", {})), + ("list_tools", ()), + ("list_functions", ()), + ("call_reward", ({},)), + ("check_health", ({},)), + ("call_function", ("f", {})), +] + + +def _data_plane_url(control_url: str) -> str: + """Swap the :8080 port in control_url for :8081 (the MCP gateway).""" + parsed = urlparse(control_url if "://" in control_url else f"http://{control_url}") + host = parsed.hostname or "127.0.0.1" + return urlunparse(parsed._replace(netloc=f"{host}:8081", path="", query="")) + + +async def round2_guards() -> bool: + print("=== Round 2: Environment guards under enable_data_plane=False ===") + async with Environment( + env_name=ENV_NAME, + aenv_url=CONTROL_URL, + api_key=AENV_API_KEY or None, + enable_data_plane=False, + ttl="5m", + ) as env: + for name, args in DATA_PLANE_METHODS: + try: + await getattr(env, name)(*args) + except EnvironmentError as e: + if "enable_data_plane=False" not in str(e): + print(f" {name}: raised wrong message: {e}") + return False + print(f" {name}: blocked OK") + continue + print(f" {name}: did NOT raise -- BUG") + return False + print("Round 2 PASS") + return True + + +async def round3_501() -> bool: + print("=== Round 3: api-service:8081 returns 501 for arca data plane ===") + data_plane_base = _data_plane_url(CONTROL_URL) + async with Environment( + env_name=ENV_NAME, + aenv_url=CONTROL_URL, + api_key=AENV_API_KEY or None, + enable_data_plane=False, + ttl="5m", + ) as env: + info = await env.get_env_info() + sandbox_id = info["instance_id"] + async with httpx.AsyncClient(timeout=10.0) as client: + for path in ["/health", "/mcp", "/some/random"]: + resp = await client.get( + f"{data_plane_base}{path}", + headers={"AEnvCore-EnvInstance-ID": sandbox_id}, + ) + try: + body = resp.json() + except Exception: + body = {"_raw": resp.text[:200]} + + ok = ( + resp.status_code == 501 + and isinstance(body, dict) + and "data plane" in str(body.get("message", "")) + ) + tag = "OK" if ok else "FAIL" + print( + f" {path}: status={resp.status_code} body={json.dumps(body)} [{tag}]" + ) + if not ok: + return False + print("Round 3 PASS") + return True + + +async def main() -> int: + if not CONTROL_URL or not ENV_NAME: + print( + "missing required env vars AENV_SYSTEM_URL and/or ARCA_TEST_ENV_NAME", + file=sys.stderr, + ) + return 2 + r2 = await round2_guards() + r3 = await round3_501() + return 0 if (r2 and r3) else 1 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py new file mode 100644 index 0000000..0dd6839 --- /dev/null +++ b/aenv/e2e_arca_presign_test.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Example: aenv + arca engine โ€” fast create with no data-plane wait, then +access the sandbox via a presigned URL (direct data-plane, bypassing +api-service). + +When to use this pattern: + * Engine is arca and the sandbox image runs an arbitrary user app + (no aenv MCP server inside). + * You want minimum latency at create time and prefer to wait on the + in-sandbox app's own readiness path. + +High-level flow: + 1. create sandbox (Environment.__aenter__, no MCP session) + 2. presign URL (Environment.presign_url) + 3. caller polls the URL (httpx, business-defined readiness path) + 4. caller does real work (whatever the sandbox process listens for) + 5. release (Environment.__aexit__) + +``enable_data_plane=False`` is what makes this work on arca: the SDK +skips the MCP session and the /health probe entirely, so no traffic +hits api-service:8081 (which on arca returns 501 by design). The +``call_tool`` / ``list_tools`` / ``call_function`` / ``call_reward`` / +``check_health`` methods will raise if invoked under this flag. + +Env vars: + AENV_SYSTEM_URL api-service base URL (default http://localhost) + ARCA_TEST_ENV_NAME envhub env name (arcaTemplateId inside) (required) + AENV_API_KEY bearer token if api-service has auth enabled (optional) + ARCA_SERVICE_PORT in-sandbox port to expose (default 18080) + ARCA_SERVICE_PATH path to GET for readiness (default /healthz) + ARCA_PRESIGN_TTL_MIN presign URL ttl (default 5) + ARCA_READINESS_TIMEOUT readiness polling budget (default 45s) +""" + +from __future__ import annotations + +import asyncio +import os +import sys +import time + +import httpx + +from aenv import Environment + +ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +AENV_URL = os.environ.get("AENV_SYSTEM_URL", os.environ.get("ARCA_API_SERVICE_URL", "")) +AENV_API_KEY = os.environ.get("AENV_API_KEY", "") + +SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "18080")) +SERVICE_PATH = os.environ.get("ARCA_SERVICE_PATH", "/healthz") +PRESIGN_TTL_MIN = float(os.environ.get("ARCA_PRESIGN_TTL_MIN", "5")) +READINESS_TIMEOUT_S = float(os.environ.get("ARCA_READINESS_TIMEOUT", "45")) + + +async def wait_ready( + target: str, timeout_s: float = READINESS_TIMEOUT_S +) -> httpx.Response: + """Poll GET target until 2xx or timeout. Returns the last response.""" + deadline = time.time() + timeout_s + last: httpx.Response | None = None + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c: + attempt = 0 + while time.time() < deadline: + attempt += 1 + try: + last = await c.get(target) + if 200 <= last.status_code < 300: + return last + print(f" readiness attempt {attempt}: status={last.status_code}") + except Exception as e: + print(f" readiness attempt {attempt}: {type(e).__name__}: {e}") + await asyncio.sleep(2.0) + if last is None: + raise TimeoutError(f"never got a response within {timeout_s}s") + raise TimeoutError( + f"never became ready within {timeout_s}s " + f"(last status={last.status_code}, body={last.text[:200]!r})" + ) + + +async def main() -> int: + if not ENV_NAME: + print("missing required env var ARCA_TEST_ENV_NAME", file=sys.stderr) + return 1 + + print(f"env_name: {ENV_NAME}") + print(f"aenv_url: {AENV_URL or '(default)'}") + + t0 = time.time() + async with Environment( + env_name=ENV_NAME, + ttl="10m", + enable_data_plane=False, + ) as env: + info = await env.get_env_info() + print( + f"[1] created sandbox in {time.time()-t0:.2f}s " + f"(id={info['instance_id']}, status={info['status']})" + ) + + url = await env.presign_url( + port=SERVICE_PORT, + expiration_time_in_minutes=PRESIGN_TTL_MIN, + ) + print(f"[2] presigned URL ({SERVICE_PORT}): {url}") + + target = url.rstrip("/") + ( + SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH + ) + print(f"[3] waiting for {SERVICE_PATH} to return 2xx...") + resp = await wait_ready(target) + print(f" ready in {time.time()-t0:.1f}s; status={resp.status_code}") + print(f" body: {resp.text[:200]!r}") + + # The same ``url`` is a valid base for HTTP / WebSocket / anything + # the sandbox process listens for on that port. It remains valid + # until the presign TTL expires. + print("[4] (business traffic would run here)") + + print(f"[5] released sandbox in {time.time()-t0:.1f}s total") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/aenv/e2e_arca_test.py b/aenv/e2e_arca_test.py new file mode 100644 index 0000000..0029ef2 --- /dev/null +++ b/aenv/e2e_arca_test.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +End-to-end smoke for api-service-arca. + +Runs against the in-cluster service via kubectl port-forward. + +Env: + ARCA_API_SERVICE_URL = http://localhost: (default 18080) + ARCA_TEST_ENV_NAME = envhub env name (with @version) to create sandbox + from. If unset, only the proxy liveness probe runs. + +The SDK is engine-unaware: this script never asserts on labels or NotImplementedError. +""" +from __future__ import annotations + +import asyncio +import os +import sys +import time +import traceback + +from aenv.core.environment import Environment + +ARCA_URL = os.environ.get("ARCA_API_SERVICE_URL", "http://localhost:18080") +ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +API_KEY = os.environ.get("AENV_API_KEY", "") + + +def _fail(msg: str) -> None: + print(f"[FAIL] {msg}") + sys.exit(1) + + +def _ok(msg: str) -> None: + print(f"[ OK ] {msg}") + + +async def probe_health() -> None: + """Plain HTTP liveness via httpx (proves port-forward works).""" + import httpx + + async with httpx.AsyncClient(timeout=5.0) as c: + r = await c.get(f"{ARCA_URL}/health") + if r.status_code != 200: + _fail(f"/health returned {r.status_code}: {r.text}") + _ok("/health -> 200") + + +async def lifecycle() -> None: + if not ENV_NAME: + print("[SKIP] ARCA_TEST_ENV_NAME not set; skipping create/release") + return + + env = Environment( + env_name=ENV_NAME, + aenv_url=ARCA_URL, + ttl="10m", + startup_timeout=180.0, + timeout=60.0, + max_retries=1, + api_key=API_KEY or None, + ) + + t0 = time.time() + try: + await env.initialize() + except Exception as e: + print("[FAIL] initialize raised") + traceback.print_exc() + _fail(f"create failed: {e!r}") + + _ok(f"initialize ok in {time.time()-t0:.1f}s, instance={env._instance}") + + if not env._instance: + _fail("env._instance is None after initialize") + + await env.release() + _ok("release ok") + + +async def main() -> None: + await probe_health() + await lifecycle() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/aenv/pyproject.toml b/aenv/pyproject.toml index a868733..f919aee 100644 --- a/aenv/pyproject.toml +++ b/aenv/pyproject.toml @@ -37,13 +37,16 @@ dependencies = [ "typer>=0.9.0", "tabulate>=0.9.0", "colorlog>=6.10.1", - "openai-agents>=0.6.3", "starlette>=0.27.0", "urllib3>=1.26.0", "docker>=6.0.0", ] [project.optional-dependencies] +agents = [ + "openai-agents>=0.6.3", +] + dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", diff --git a/aenv/src/aenv/client/scheduler_client.py b/aenv/src/aenv/client/scheduler_client.py index 0509491..00dccef 100644 --- a/aenv/src/aenv/client/scheduler_client.py +++ b/aenv/src/aenv/client/scheduler_client.py @@ -18,7 +18,7 @@ """ import asyncio -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import httpx @@ -112,6 +112,7 @@ async def create_env_instance( arguments: Optional[List[str]] = None, owner: Optional[str] = None, labels: Optional[Dict[str, str]] = None, + mount_points: Optional[List[Dict[str, Any]]] = None, ) -> EnvInstance: """ Create a new environment instance. @@ -124,6 +125,8 @@ async def create_env_instance( ttl: Time to live for instance owner: Optional owner of the instance labels: Optional labels for the instance + mount_points: Optional mount-point dicts forwarded to the sandbox + engine. Supported engines: arca (ignored on k8s/standard/faas). Returns: Created EnvInstance @@ -135,7 +138,7 @@ async def create_env_instance( raise NetworkError("Client not connected") logger.info( - f"Creating environment instance: {name}, datasource: {datasource}, ttl: {ttl}, environment_variables: {environment_variables}, arguments: {arguments}, owner: {owner}, labels: {labels}, url: {self.base_url}" + f"Creating environment instance: {name}, datasource: {datasource}, ttl: {ttl}, environment_variables: {environment_variables}, arguments: {arguments}, owner: {owner}, labels: {labels}, mount_points: {mount_points}, url: {self.base_url}" ) request = EnvInstanceCreateRequest( envName=name, @@ -145,6 +148,7 @@ async def create_env_instance( ttl=ttl, owner=owner, labels=labels, + mount_points=mount_points, ) for attempt in range(self.max_retries + 1): @@ -315,6 +319,51 @@ async def delete_env_instance(self, instance_id: str) -> bool: continue raise NetworkError(f"Network error: {str(e)}") from e + async def presign_url( + self, + instance_id: str, + port: int, + expiration_time_in_minutes: float = 5, + ) -> str: + """Get a short-lived URL pointing to a port inside the sandbox. + + The call is engine-unaware at the SDK layer; api-service returns an + error if the active engine does not support presigning. + """ + if not self._client: + raise NetworkError("Client not connected") + payload = { + "port": port, + "expiration_time_in_minutes": expiration_time_in_minutes, + } + for attempt in range(self.max_retries + 1): + try: + response = await self._client.post( + f"/env-instance/{instance_id}/presign-url", json=payload + ) + try: + api_response = APIResponse(**response.json()) + except ValueError as e: + raise EnvironmentError( + f"Invalid server response: {response.status_code} - {response.text[:200]}" + ) from e + if not api_response.success: + raise EnvironmentError( + f"presign_url failed: {api_response.message}" + ) + data = api_response.data or {} + url = data.get("url") if isinstance(data, dict) else None + if not url: + raise EnvironmentError( + f"presign_url: empty url in response ({data!r})" + ) + return url + except httpx.RequestError as e: + if attempt < self.max_retries: + await asyncio.sleep(2**attempt) + continue + raise NetworkError(f"Network error: {str(e)}") from e + async def wait_for_status( self, instance_id: str, diff --git a/aenv/src/aenv/core/environment.py b/aenv/src/aenv/core/environment.py index 24ed841..b9f0915 100644 --- a/aenv/src/aenv/core/environment.py +++ b/aenv/src/aenv/core/environment.py @@ -23,16 +23,16 @@ import random import traceback from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from urllib.parse import urlparse, urlunparse import httpx -from agents.tool import FunctionTool -from agents.tool import Tool as OpenAITool -from agents.tool_context import ToolContext from fastmcp import Client from fastmcp.client.transports import StreamableHttpTransport +if TYPE_CHECKING: + from agents.tool import Tool as OpenAITool + from aenv.client.scheduler_client import AEnvSchedulerClient from aenv.core.exceptions import ( EnvironmentError, @@ -121,8 +121,10 @@ def __init__( max_retries: int = 10, api_key: Optional[str] = None, skip_for_healthy: bool = False, + enable_data_plane: bool = True, owner: Optional[str] = None, labels: Optional[Dict[str, str]] = None, + mount_points: Optional[List[Dict[str, Any]]] = None, ): """ Initialize environment. @@ -138,7 +140,22 @@ def __init__( ttl: Time to live in seconds defaults to 10 minutes max_retries: Maximum retry attempts for failed requests api_key: Optional API key for authentication - skip_for_healthy: Skip health check if True (defaults to False) + skip_for_healthy: Skip the data-plane ``/health`` readiness probe + only. Has no effect when ``enable_data_plane=False`` (the + probe is skipped unconditionally in that mode). + enable_data_plane: When True (default), the SDK opens an MCP + session against the sandbox on port 8081 and exposes + ``call_tool`` / ``list_tools`` / ``call_function`` / + ``call_reward`` / ``check_health``. When False, the data + plane is not touched at all โ€” no MCP session, no ``/health`` + probe, and the above methods raise ``EnvironmentError``. + ``presign_url`` still works because it is a control-plane + API. Required for arca-engine sandboxes whose images do not + embed the aenv MCP server. + mount_points: Optional list of mount-point dicts forwarded to the + backend sandbox engine. Each entry: ``{"id": "OSS_xxx", + "remote_dir": "/data", "local_dir": "/workspace"}``. + Supported engines: arca (ignored on k8s/standard/faas). """ self.env_name = env_name self.datasource = datasource @@ -146,8 +163,11 @@ def __init__( self.arguments = arguments or [] self.dummy_instance_ip = os.getenv("DUMMY_INSTANCE_IP") self.skip_for_healthy = skip_for_healthy + self.enable_data_plane = enable_data_plane self.owner = owner self.labels = labels + # Supported engines: arca (ignored on k8s/standard/faas). + self.mount_points = mount_points if not aenv_url: aenv_url = self.dummy_instance_ip or os.getenv( @@ -183,6 +203,14 @@ def _log_prefix(self) -> str: ) return f"[ENV:{instance_id}][sdk:v{__version__}]" + def _require_data_plane(self, op: str) -> None: + if not self.enable_data_plane: + raise EnvironmentError( + f"{op} is unavailable: this Environment was created with " + f"enable_data_plane=False (no MCP session, no /health). Use " + f"presign_url() to expose an in-sandbox port instead." + ) + async def _backoff(self, attempt: int, base: float = 2.0) -> None: """Exponential backoff with jitter.""" wait = base**attempt + random.uniform(0, 1) @@ -204,6 +232,8 @@ async def _rebuild_mcp_client(self) -> None: async def __aenter__(self): """Async context manager entry.""" await self.initialize() + if not self.enable_data_plane: + return self max_attempts = 3 for attempt in range(max_attempts): try: @@ -223,7 +253,8 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" - await self._close_mcp_session() + if self.enable_data_plane: + await self._close_mcp_session() await self.release() async def initialize(self) -> bool: @@ -298,6 +329,27 @@ async def _close_mcp_session(self): finally: self._mcp_session_active = False + async def presign_url( + self, + port: int, + expiration_time_in_minutes: float = 5, + ) -> str: + """Return a short-lived URL pointing to a port inside this sandbox. + + Mirrors arca-sandbox SDK's ``presign_url`` signature. Engine differences + are resolved inside api-service - the SDK does not branch on engine. + Raises ``EnvironmentError`` if the active engine does not support + presigning (api-service returns a 501 with explanation). + """ + await self._ensure_initialized() + if not self._client or not self._instance: + raise EnvironmentError("presign_url: environment not initialized") + return await self._client.presign_url( + self._instance.id, + port=port, + expiration_time_in_minutes=expiration_time_in_minutes, + ) + async def release(self): """Release environment resources.""" logger.info( @@ -350,9 +402,12 @@ async def list_tools(self) -> List[Dict[str, Any]]: """ List all available tools in the environment using MCP client. + Supported engines: all. + Returns: List of tool descriptors in MCP format """ + self._require_data_plane("list_tools") await self._ensure_initialized() try: @@ -384,10 +439,20 @@ async def list_tools(self) -> List[Dict[str, Any]]: f"Failed to list tools for environment '{self.env_name}': {str(e)}" ) - async def list_openai_tools(self) -> List[OpenAITool]: + async def list_openai_tools(self) -> "List[OpenAITool]": + try: + from agents.tool import FunctionTool + from agents.tool import Tool as OpenAITool # noqa: F401 + from agents.tool_context import ToolContext # noqa: F401 + except ImportError as e: + raise ImportError( + "list_openai_tools() requires the 'agents' extra. " + "Install it with: pip install 'aenvironment[agents]'" + ) from e + tools = await self.list_tools() - openai_tools: List[OpenAITool] = [] + openai_tools: List[Any] = [] for tool in tools: name = str(tool.get("name", "")) description = str(tool.get("description", "")) @@ -396,7 +461,7 @@ async def list_openai_tools(self) -> List[OpenAITool]: input_schema = {"type": "object", "properties": {}} async def _on_invoke_tool( - ctx: ToolContext[Any], input: str, *, _name: str = name + ctx: Any, input: str, *, _name: str = name ) -> Any: try: args: Dict[str, Any] = json.loads(input) if input else {} @@ -436,9 +501,12 @@ async def list_functions(self) -> Dict[str, Any]: """ List all registered functions in the environment including reward and health. + Supported engines: all. + Returns: Dictionary containing categorized function lists (functions, reward, health) """ + self._require_data_plane("list_functions") await self._ensure_initialized() try: @@ -495,6 +563,8 @@ async def call_reward( """ Execute the reward function via the /task/reward endpoint. + Supported engines: all. + Args: arguments: Arguments to pass to the reward function timeout: Override default timeout @@ -505,6 +575,8 @@ async def call_reward( Raises: EnvironmentError: If reward execution fails """ + self._require_data_plane("call_reward") + await self._ensure_initialized() return await self._call_function( self.aenv_reward_url, arguments=arguments, timeout=timeout ) @@ -631,11 +703,14 @@ async def check_health( """ Execute the check-health function via the /health endpoint. + Supported engines: all. + Returns: Raises: EnvironmentError: If health check execution fails """ + self._require_data_plane("check_health") await self._ensure_initialized() logger.info( @@ -675,6 +750,8 @@ async def call_tool( """ Execute a tool with given arguments using MCP client. + Supported engines: all. + Retry strategy (idempotent-safe): - Session establishment failures are retried (tool was never sent to server) - Once call_tool_mcp() is invoked, the tool MAY have executed on the server. @@ -693,6 +770,7 @@ async def call_tool( ToolError: If tool execution fails after invocation EnvironmentError: If session cannot be established """ + self._require_data_plane("call_tool") await self._ensure_initialized() # Circuit breaker: fail fast if too many consecutive tool errors @@ -817,9 +895,12 @@ async def _ensure_initialized(self): async def _wait_for_healthy(self, timeout: float = 300.0) -> None: """Wait for environment instance to be healthy.""" - if self.skip_for_healthy: + if not self.enable_data_plane or self.skip_for_healthy: logger.info( - f"{self._log_prefix()} Skipping health check for environment {self.env_name}" + f"{self._log_prefix()} Skipping /health probe for environment " + f"{self.env_name} " + f"(enable_data_plane={self.enable_data_plane}, " + f"skip_for_healthy={self.skip_for_healthy})" ) return @@ -891,7 +972,12 @@ async def _wait_for_healthy(self, timeout: float = 300.0) -> None: ) async def wait_for_ready(self, timeout: float = 300.0) -> None: - """Wait for environment instance to be ready.""" + """Wait for environment instance to be ready. + + Readiness is defined by the control-plane status reaching ``RUNNING`` + plus the data-plane MCP health probe returning healthy. Engine + differences are fully resolved inside api-service. + """ if not self._client or not self._instance: await self.initialize() @@ -904,6 +990,7 @@ async def wait_for_ready(self, timeout: float = 300.0) -> None: ) self._instance = instance + await self._wait_for_healthy() except Exception as e: logger.error( @@ -945,6 +1032,7 @@ async def _create_env_instance(self): ttl=self.ttl, owner=self.owner, labels=self.labels, + mount_points=self.mount_points, ) logger.info( f"{self._log_prefix()} Environment instance created with ID: {self._instance.id}" @@ -971,6 +1059,8 @@ async def call_function( """ Execute a registered function via HTTP endpoint. + Supported engines: all. + Args: function_name: name of the registered function arguments: Arguments to pass to the function @@ -982,6 +1072,8 @@ async def call_function( Raises: EnvironmentError: If function execution fails """ + self._require_data_plane("call_function") + await self._ensure_initialized() function_url = f"{self.aenv_functions_base_url}/{function_name}" return await self._call_function( function_url, arguments=arguments, timeout=timeout diff --git a/aenv/src/aenv/core/models.py b/aenv/src/aenv/core/models.py index 7b2fad2..80ada14 100644 --- a/aenv/src/aenv/core/models.py +++ b/aenv/src/aenv/core/models.py @@ -96,6 +96,15 @@ class EnvInstanceCreateRequest(BaseModel): arguments: Optional[List[str]] = Field(None, description="Startup arguments") owner: Optional[str] = Field(None, description="Instance owner") labels: Optional[Dict[str, str]] = Field(None, description="Resource labels") + mount_points: Optional[List[Dict[str, Any]]] = Field( + None, + description=( + "OSS/disk mount points forwarded to the sandbox engine. " + "Each entry is a dict like " + "{'id': 'OSS_xxx', 'remote_dir': '/data', 'local_dir': '/workspace'}. " + "Supported engines: arca (ignored on k8s/standard/faas)." + ), + ) class EnvInstanceListResponse(BaseModel): diff --git a/api-service/controller/env_instance.go b/api-service/controller/env_instance.go index 29d3f8d..d6a0049 100644 --- a/api-service/controller/env_instance.go +++ b/api-service/controller/env_instance.go @@ -57,6 +57,9 @@ type CreateEnvInstanceRequest struct { TTL string `json:"ttl"` Owner string `json:"owner"` Labels map[string]string `json:"labels,omitempty"` + // MountPoints forwards Arca-style mount entries to the backend. + // Supported engines: arca (ignored on k8s/standard/faas). + MountPoints []map[string]interface{} `json:"mount_points,omitempty"` } // CreateEnvInstance creates a new EnvInstance @@ -120,6 +123,10 @@ func (ctrl *EnvInstanceController) CreateEnvInstance(c *gin.Context) { if req.Labels != nil { backendEnv.DeployConfig["labels"] = req.Labels } + // Arca-specific passthrough. Supported engines: arca. + if len(req.MountPoints) > 0 { + backendEnv.DeployConfig["mountPoints"] = req.MountPoints + } // Call ScheduleClient to create Pod envInstance, err := ctrl.envInstanceService.CreateEnvInstance(backendEnv) if err != nil { @@ -196,6 +203,41 @@ func (ctrl *EnvInstanceController) DeleteEnvInstance(c *gin.Context) { } } +// PresignURLRequest is the body for POST /env-instance/:id/presign-url. +type PresignURLRequest struct { + Port int `json:"port" binding:"required"` + ExpirationTimeInMinutes float64 `json:"expiration_time_in_minutes,omitempty"` +} + +// PresignURLResponse is the body returned from PresignURL. +type PresignURLResponse struct { + URL string `json:"url"` +} + +// PresignURL returns a short-lived URL pointing to a port inside the sandbox. +// Engines that don't support presigning surface a clear error from the +// service layer; the SDK is engine-unaware. +// +// POST /env-instance/:id/presign-url +func (ctrl *EnvInstanceController) PresignURL(c *gin.Context) { + id := c.Param("id") + if id == "" { + backendmodels.JSONErrorWithMessage(c, 400, "Missing id parameter") + return + } + var req PresignURLRequest + if err := c.ShouldBindJSON(&req); err != nil { + backendmodels.JSONErrorWithMessage(c, 400, "Invalid request: "+err.Error()) + return + } + url, err := ctrl.envInstanceService.PresignURL(id, req.Port, req.ExpirationTimeInMinutes) + if err != nil { + backendmodels.JSONErrorWithMessage(c, 501, err.Error()) + return + } + backendmodels.JSONSuccess(c, PresignURLResponse{URL: url}) +} + func (ctrl *EnvInstanceController) ListEnvInstances(c *gin.Context) { token := util.GetCurrentToken(c) if token == nil { diff --git a/api-service/controller/env_instance_arca_test.go b/api-service/controller/env_instance_arca_test.go new file mode 100644 index 0000000..ec8a287 --- /dev/null +++ b/api-service/controller/env_instance_arca_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package controller + +import ( + "bytes" + "encoding/json" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" +) + +// TestCreateEnvInstanceRequest_ArcaFieldsBinding verifies JSON binding of +// mount_points. +// +// Supported engines: arca. +func TestCreateEnvInstanceRequest_ArcaFieldsBinding(t *testing.T) { + body := `{ + "envName": "test@v1", + "ttl": "30m", + "mount_points": [ + {"id": "OSS_bucket_ak", "remote_dir": "/data/oss", "local_dir": "/workspace/oss"} + ] + }` + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/env-instance", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + var req CreateEnvInstanceRequest + if err := c.ShouldBindJSON(&req); err != nil { + t.Fatalf("bind: %v", err) + } + + if len(req.MountPoints) != 1 { + t.Fatalf("mount_points length = %d, want 1", len(req.MountPoints)) + } + mp := req.MountPoints[0] + if mp["id"] != "OSS_bucket_ak" { + t.Errorf("mount_points[0].id = %v", mp["id"]) + } + if mp["remote_dir"] != "/data/oss" { + t.Errorf("mount_points[0].remote_dir = %v", mp["remote_dir"]) + } +} + +// TestCreateEnvInstance_MountPointsForwardedToDeployConfig mirrors the +// controller's forwarding step without invoking the HTTP handler. +// +// Supported engines: arca. +func TestCreateEnvInstance_MountPointsForwardedToDeployConfig(t *testing.T) { + reqBody := CreateEnvInstanceRequest{ + EnvName: "test@v1", + MountPoints: []map[string]interface{}{ + {"id": "OSS_x", "remote_dir": "/a", "local_dir": "/b"}, + }, + } + deployConfig := make(map[string]interface{}) + if len(reqBody.MountPoints) > 0 { + deployConfig["mountPoints"] = reqBody.MountPoints + } + got, ok := deployConfig["mountPoints"].([]map[string]interface{}) + if !ok { + t.Fatalf("DeployConfig[mountPoints] type = %T, want []map[string]interface{}", deployConfig["mountPoints"]) + } + if len(got) != 1 || got[0]["id"] != "OSS_x" { + t.Errorf("unexpected mountPoints: %v", got) + } +} + +// TestCreateEnvInstance_ArcaFieldsOmitted_NoDeployConfigMutation verifies +// backward compatibility: existing callers (k8s/standard/faas) that don't +// supply arca fields see no extra keys injected. +// +// Supported engines: all. +func TestCreateEnvInstance_ArcaFieldsOmitted_NoDeployConfigMutation(t *testing.T) { + reqBody := CreateEnvInstanceRequest{EnvName: "test@v1", TTL: "30m"} + deployConfig := make(map[string]interface{}) + if len(reqBody.MountPoints) > 0 { + deployConfig["mountPoints"] = reqBody.MountPoints + } + if _, ok := deployConfig["mountPoints"]; ok { + t.Errorf("mountPoints should not be set when request omits it") + } +} + +// TestCreateEnvInstanceRequest_ArcaFieldsOmitted_UnmarshalSilent verifies +// empty request body does not populate arca fields. +func TestCreateEnvInstanceRequest_ArcaFieldsOmitted_UnmarshalSilent(t *testing.T) { + body := `{"envName": "test@v1", "ttl": "30m"}` + var req CreateEnvInstanceRequest + if err := json.Unmarshal([]byte(body), &req); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if req.MountPoints != nil { + t.Errorf("mount_points = %v, want nil", req.MountPoints) + } +} diff --git a/api-service/controller/mcp_proxy.go b/api-service/controller/mcp_proxy.go index 46041a9..40e4016 100644 --- a/api-service/controller/mcp_proxy.go +++ b/api-service/controller/mcp_proxy.go @@ -48,16 +48,28 @@ const ( // HTTP methods MethodGET = "GET" MethodPOST = "POST" + + // Schedule types (kept in sync with main.go --schedule-type). + scheduleTypeArca = "arca" ) +// MCPGatewayConfig is injected at api-service startup so the gateway never +// has to query per-request engine metadata. +type MCPGatewayConfig struct { + ScheduleType string // "k8s" | "standard" | "faas" | "arca" + ArcaBaseURL string + ArcaAPIKey string +} + // MCPGateway MCP gateway struct type MCPGateway struct { router *gin.RouterGroup transport *http.Transport + config MCPGatewayConfig } // NewMCPGateway creates a new MCP gateway instance -func NewMCPGateway(router *gin.RouterGroup) *MCPGateway { +func NewMCPGateway(router *gin.RouterGroup, cfg MCPGatewayConfig) *MCPGateway { gateway := &MCPGateway{ router: router, transport: &http.Transport{ @@ -65,6 +77,7 @@ func NewMCPGateway(router *gin.RouterGroup) *MCPGateway { MaxIdleConnsPerHost: 10, IdleConnTimeout: 90 * time.Second, }, + config: cfg, } gateway.setupRoutes() @@ -77,8 +90,24 @@ func (g *MCPGateway) setupRoutes() { } func (g *MCPGateway) innerRouter(c *gin.Context) { - proxyURL, _ := g.getMCPSeverURL(c) path := c.Param("path") + if g.config.ScheduleType == scheduleTypeArca { + // Arca sandboxes do not embed the aenv MCP server, so the data + // plane (MCP / /health / SSE) is not supported. The SDK is + // expected to opt out via Environment(enable_data_plane=False) + // and use presign_url() instead. We still respond explicitly so + // stray callers get a clear error rather than a hang. + c.JSON(http.StatusNotImplemented, gin.H{ + "success": false, + "code": http.StatusNotImplemented, + "message": "data plane (MCP / /health) is not supported on arca engine; " + + "use presign_url() to expose an in-sandbox port", + "data": nil, + }) + return + } + + proxyURL, _ := g.getMCPSeverURL(c) if proxyURL != "" { switch path { case PathSSE: diff --git a/api-service/main.go b/api-service/main.go index d8862a0..73623f3 100644 --- a/api-service/main.go +++ b/api-service/main.go @@ -20,6 +20,7 @@ package main import ( "math/rand" "net/http" + "os" "runtime" "time" @@ -48,11 +49,15 @@ var ( tokenCacheMaxEntries int tokenCacheTTLMinutes int cleanupInterval string + // Supported engines: arca. + arcaBaseURL string + // Supported engines: arca. + arcaAPIKey string ) func init() { pflag.StringVar(&scheduleAddr, "schedule-addr", "", "Meta service address (host:port)") - pflag.StringVar(&scheduleType, "schedule-type", "k8s", "sandbox service schedule type, currently only 'k8s', 'standard' support") + pflag.StringVar(&scheduleType, "schedule-type", "k8s", "sandbox service schedule type: 'k8s', 'standard', 'faas', or 'arca'") pflag.StringVar(&backendAddr, "backend-addr", "", "backend service address (host:port)") pflag.Int64Var(&qps, "qps", int64(100), "total qps limit") @@ -63,6 +68,10 @@ func init() { pflag.StringVar(&redisAddr, "redis-addr", "", "Redis address (host:port)") pflag.StringVar(&redisPassword, "redis-password", "", "Redis password") pflag.StringVar(&cleanupInterval, "cleanup-interval", "5m", "Cleanup service interval (e.g., 5m, 1h)") + + // Arca sandbox engine flags. Supported engines: arca. + pflag.StringVar(&arcaBaseURL, "arca-base-url", "", "Arca sandbox OpenAPI base URL. Supported engines: arca") + pflag.StringVar(&arcaAPIKey, "arca-api-key", "", "Arca sandbox OpenAPI key; falls back to ARCA_API_KEY env. Supported engines: arca") } func healthChecker(c *gin.Context) { @@ -104,6 +113,19 @@ func main() { scheduleClient = service.NewEnvInstanceClient(scheduleAddr) case "faas": scheduleClient = service.NewFaaSClient(scheduleAddr) + case "arca": + // Supported engines: arca. + key := arcaAPIKey + if key == "" { + key = os.Getenv("ARCA_API_KEY") + } + if arcaBaseURL == "" { + log.Fatalf("--arca-base-url is required when --schedule-type=arca") + } + if key == "" { + log.Fatalf("arca API key missing: set --arca-api-key or ARCA_API_KEY") + } + scheduleClient = service.NewArcaClient(arcaBaseURL, key) default: log.Fatalf("unsupported schedule type: %v", scheduleType) } @@ -119,6 +141,7 @@ func main() { mainRouter.GET("/env-instance/:id/list", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.ListEnvInstances) mainRouter.GET("/env-instance/:id", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.GetEnvInstance) mainRouter.DELETE("/env-instance/:id", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.DeleteEnvInstance) + mainRouter.POST("/env-instance/:id/presign-url", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.PresignURL) // Service routes if envServiceController != nil { @@ -144,7 +167,11 @@ func main() { mcpRouter.Use(middleware.MCPMetricsMiddleware()) mcpRouter.Use(middleware.LoggingMiddleware()) mcpGroup := mcpRouter.Group("/") - controller.NewMCPGateway(mcpGroup) + controller.NewMCPGateway(mcpGroup, controller.MCPGatewayConfig{ + ScheduleType: scheduleType, + ArcaBaseURL: arcaBaseURL, + ArcaAPIKey: arcaAPIKey, + }) // Start two services go func() { diff --git a/api-service/service/arca_client.go b/api-service/service/arca_client.go new file mode 100644 index 0000000..04a3001 --- /dev/null +++ b/api-service/service/arca_client.go @@ -0,0 +1,467 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "strings" + "time" + + log "github.com/sirupsen/logrus" + + "api-service/models" + backend "envhub/models" +) + +// ArcaClient implements EnvInstanceService against the Arca sandbox OpenAPI +// (`/arca/openapi/v1/sandbox/*`). +// +// Unlike ScheduleClient / EnvInstanceClient / FaaSClient, ArcaClient does not +// assemble cpu/memory/disk/image โ€” those are fully determined by the Arca +// template identified by DeployConfig["arcaTemplateId"]. List is not +// supported because Arca OpenAPI has no list endpoint in this iteration. +// +// Supported engines: arca. +type ArcaClient struct { + baseURL string + apiKey string + httpClient *http.Client +} + +// Compile-time interface compliance check. +var _ EnvInstanceService = (*ArcaClient)(nil) + +// Arca OpenAPI paths (relative to baseURL). +const ( + arcaInstancesPath = "/arca/openapi/v1/sandbox/instances" + arcaGatewayPrefix = "/arca/api/v1/sandbox" + arcaAPIKeyHeader = "x-agent-sandbox-api-key" + arcaSandboxIDHdr = "x-agent-sandbox-id" + arcaTemplateIDHdr = "x-agent-sandbox-template-id" + arcaPortHeader = "x-agent-sandbox-port" +) + +// DeployConfig keys consumed by ArcaClient. +const ( + deployKeyArcaTemplateID = "arcaTemplateId" + deployKeyMountPoints = "mountPoints" + deployKeyEnvVars = "environment_variables" + deployKeyOwner = "owner" +) + +// Engine label key/value written onto returned EnvInstance.Labels. +const ( + engineLabelKey = "engine" + engineLabelArca = "arca" +) + +// NewArcaClient constructs an ArcaClient targeting the given Arca OpenAPI base +// URL with the supplied tenant API key. The returned client is safe for +// concurrent use from multiple goroutines. +// +// Supported engines: arca. +func NewArcaClient(baseURL, apiKey string) *ArcaClient { + return &ArcaClient{ + baseURL: strings.TrimRight(baseURL, "/"), + apiKey: apiKey, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +// arcaCreateRequest is the outbound body for +// POST /arca/openapi/v1/sandbox/instances. +// +// Supported engines: arca. Fields are chosen to match spec ยง3.2; notably +// `resource` and `image` are intentionally omitted because the Arca template +// fully determines them. +type arcaCreateRequest struct { + TemplateID string `json:"template_id"` + TTLInMinutes int `json:"ttl_in_minutes,omitempty"` + MountPoints []interface{} `json:"mount_points,omitempty"` + Envs map[string]string `json:"envs,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// arcaEnvelope matches Arca's uniform response wrapper. Note: presign +// endpoint emits an empty *string* for code on success while OpenAPI emits +// integers, so we keep code as a raw token and only stringify when surfacing +// errors back to the caller. +type arcaEnvelope struct { + Success bool `json:"success"` + Code json.RawMessage `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data"` +} + +// arcaCreatedInstance is Arca's create response payload. +// Arca emits sandbox_id in both snake_case and camelCase; the snake form +// is historically stable and used here. +type arcaCreatedInstance struct { + SandboxID string `json:"sandbox_id"` +} + +// arcaSandboxInfo is Arca's GET /instances/{id} response payload (subset). +// Arca returns `podIp` only as camelCase (verified against stable env 2026-04). +// Supported engines: arca. +type arcaSandboxInfo struct { + SandboxID string `json:"sandbox_id"` + Status string `json:"status"` + PodIP string `json:"podIp,omitempty"` +} + +// mapArcaStatus converts Arca OpenAPI status strings into EnvInstance.Status. +// Unknown values fall back to Failed with a log warning. +// +// Supported engines: arca. +func mapArcaStatus(s string) string { + switch strings.ToUpper(strings.TrimSpace(s)) { + case "PENDING": + return models.EnvInstanceStatusPending.String() + case "RUNNING": + return models.EnvInstanceStatusRunning.String() + case "FAILED", "PAUSED": + return models.EnvInstanceStatusFailed.String() + case "DESTROYED": + return models.EnvInstanceStatusTerminated.String() + default: + log.Warnf("arca: unknown sandbox status %q, mapping to Failed", s) + return models.EnvInstanceStatusFailed.String() + } +} + +// ttlMinutesCeil parses a Go-style duration string (e.g. "30m", "1h", "90s") +// and returns ceil(duration / 1min). Empty input returns 0 with no error. +// +// Supported engines: arca. +func ttlMinutesCeil(raw string) (int, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, nil + } + d, err := time.ParseDuration(raw) + if err != nil { + return 0, fmt.Errorf("invalid ttl %q: %w", raw, err) + } + if d <= 0 { + return 0, nil + } + return int(math.Ceil(float64(d) / float64(time.Minute))), nil +} + +// coerceMountPoints accepts the DeployConfig["mountPoints"] value and +// normalises it into a []interface{} ready for JSON serialisation. This +// tolerates the two common shapes: []interface{} (from JSON unmarshalling +// into map[string]interface{}) and []map[string]string (from programmatic +// controller writes). +func coerceMountPoints(raw interface{}) []interface{} { + switch v := raw.(type) { + case nil: + return nil + case []interface{}: + return v + case []map[string]interface{}: + out := make([]interface{}, len(v)) + for i, item := range v { + out[i] = item + } + return out + case []map[string]string: + out := make([]interface{}, len(v)) + for i, item := range v { + copyMap := make(map[string]interface{}, len(item)) + for k, val := range item { + copyMap[k] = val + } + out[i] = copyMap + } + return out + default: + log.Warnf("arca: mount_points has unexpected type %T, ignoring", raw) + return nil + } +} + +// coerceEnvs reads DeployConfig["environment_variables"] and returns a +// map[string]string or nil. Non-string values are stringified via fmt.Sprint. +func coerceEnvs(raw interface{}) map[string]string { + if raw == nil { + return nil + } + switch v := raw.(type) { + case map[string]string: + if len(v) == 0 { + return nil + } + return v + case map[string]interface{}: + if len(v) == 0 { + return nil + } + out := make(map[string]string, len(v)) + for k, val := range v { + out[k] = fmt.Sprint(val) + } + return out + default: + return nil + } +} + +// doJSON executes an HTTP request with the given method/path/body and decodes +// the Arca envelope into out. Non-2xx responses return an error carrying +// status code + body excerpt. Envelope-level `success=false` also errors. +func (c *ArcaClient) doJSON(method, path string, body interface{}, extraHeaders map[string]string, out interface{}) error { + var reader io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("arca: marshal request: %w", err) + } + reader = bytes.NewReader(data) + } + + url := c.baseURL + path + req, err := http.NewRequest(method, url, reader) + if err != nil { + return fmt.Errorf("arca: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set(arcaAPIKeyHeader, c.apiKey) + for k, v := range extraHeaders { + req.Header.Set(k, v) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("arca: %s %s: %w", method, path, err) + } + defer func() { + if cerr := resp.Body.Close(); cerr != nil { + log.Warnf("arca: close response body: %v", cerr) + } + }() + + raw, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("arca: read response: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("arca: %s %s returned %d: %s", method, path, resp.StatusCode, truncateBody(raw)) + } + + var envelope arcaEnvelope + if err := json.Unmarshal(raw, &envelope); err != nil { + return fmt.Errorf("arca: decode envelope: %w; body=%s", err, truncateBody(raw)) + } + if !envelope.Success { + return fmt.Errorf("arca: %s %s failed (code %s): %s", method, path, strings.Trim(string(envelope.Code), `"`), envelope.Message) + } + if out != nil && len(envelope.Data) > 0 && !bytes.Equal(envelope.Data, []byte("null")) { + if err := json.Unmarshal(envelope.Data, out); err != nil { + return fmt.Errorf("arca: decode data: %w; body=%s", err, truncateBody(envelope.Data)) + } + } + return nil +} + +// CreateEnvInstance creates a new Arca sandbox from the envhub Env's +// DeployConfig. Required key: `arcaTemplateId`. Returns an EnvInstance with +// `Labels[engine]="arca"`. The initial status is usually Pending; callers +// should poll GetEnvInstance until Running. +// +// Supported engines: arca. +func (c *ArcaClient) CreateEnvInstance(req *backend.Env) (*models.EnvInstance, error) { + if req == nil { + return nil, fmt.Errorf("arca: nil env") + } + if req.DeployConfig == nil { + return nil, fmt.Errorf("arcaTemplateId required for arca engine (DeployConfig is nil)") + } + + templateID, _ := req.DeployConfig[deployKeyArcaTemplateID].(string) + if templateID == "" { + return nil, fmt.Errorf("arcaTemplateId required for arca engine") + } + + ttlMin, err := ttlMinutesCeil(req.GetTTL()) + if err != nil { + return nil, fmt.Errorf("arca: %w", err) + } + + body := arcaCreateRequest{ + TemplateID: templateID, + TTLInMinutes: ttlMin, + MountPoints: coerceMountPoints(req.DeployConfig[deployKeyMountPoints]), + Envs: coerceEnvs(req.DeployConfig[deployKeyEnvVars]), + } + + owner, _ := req.DeployConfig[deployKeyOwner].(string) + body.Metadata = map[string]string{ + "aenv_env_name": req.Name, + "aenv_env_version": req.Version, + } + if owner != "" { + body.Metadata["aenv_owner"] = owner + } + + var created arcaCreatedInstance + headers := map[string]string{arcaTemplateIDHdr: templateID} + if err := c.doJSON(http.MethodPost, arcaInstancesPath, body, headers, &created); err != nil { + return nil, err + } + if created.SandboxID == "" { + return nil, fmt.Errorf("arca: empty sandbox_id in create response") + } + + inst := models.NewEnvInstanceWithOwner(created.SandboxID, req, "", owner) + inst.TTL = req.GetTTL() + inst.Labels = mergeLabelsWithEngine(req.DeployConfig) + return inst, nil +} + +// GetEnvInstance fetches sandbox detail by ID and maps it to EnvInstance with +// the arca engine label. +// +// Supported engines: arca. +func (c *ArcaClient) GetEnvInstance(id string) (*models.EnvInstance, error) { + if id == "" { + return nil, fmt.Errorf("arca: empty sandbox id") + } + path := arcaInstancesPath + "/" + id + headers := map[string]string{arcaSandboxIDHdr: id} + + var info arcaSandboxInfo + if err := c.doJSON(http.MethodGet, path, nil, headers, &info); err != nil { + return nil, err + } + + now := time.Now().Format("2006-01-02 15:04:05") + return &models.EnvInstance{ + ID: info.SandboxID, + Status: mapArcaStatus(info.Status), + CreatedAt: now, + UpdatedAt: now, + IP: info.PodIP, + Labels: map[string]string{engineLabelKey: engineLabelArca}, + }, nil +} + +// DeleteEnvInstance releases an Arca sandbox. +// +// Supported engines: arca. +func (c *ArcaClient) DeleteEnvInstance(id string) error { + if id == "" { + return fmt.Errorf("arca: empty sandbox id") + } + path := arcaInstancesPath + "/" + id + headers := map[string]string{arcaSandboxIDHdr: id} + return c.doJSON(http.MethodDelete, path, nil, headers, nil) +} + +// ListEnvInstances is intentionally unsupported for arca because Arca OpenAPI +// provides no list endpoint. Callers (e.g. cleanup_service) must tolerate the +// error gracefully. +// +// Supported engines: arca (always returns error). +func (c *ArcaClient) ListEnvInstances(envName string) ([]*models.EnvInstance, error) { + return nil, fmt.Errorf("arca: ListEnvInstances not supported") +} + +// Warmup is permanently unsupported for arca (parity with ScheduleClient). +// +// Supported engines: arca (always returns error). +func (c *ArcaClient) Warmup(req *backend.Env) error { + return fmt.Errorf("arca: Warmup not supported") +} + +// arcaPresignTokenRequest is the body for POST /arca/api/v1/sandbox/{id}/presign/token. +type arcaPresignTokenRequest struct { + ExpirationTime float64 `json:"expiration_time,omitempty"` +} + +// arcaPresignTokenResponse is the unwrapped data of the presign envelope. +type arcaPresignTokenResponse struct { + Token string `json:"token"` +} + +// PresignURL acquires a short-lived URL pointing to an in-sandbox port via +// Arca's gateway. The returned URL is fully-qualified and can be used by the +// caller as a base for HTTP/MCP traffic, or by the api-service MCP proxy as +// a reverse-proxy target. +// +// Supported engines: arca. +func (c *ArcaClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + if id == "" { + return "", fmt.Errorf("arca: empty sandbox id") + } + if port <= 0 { + return "", fmt.Errorf("arca: port must be > 0") + } + path := fmt.Sprintf("%s/%s/presign/token", arcaGatewayPrefix, id) + headers := map[string]string{ + arcaSandboxIDHdr: id, + arcaPortHeader: fmt.Sprintf("%d", port), + } + body := arcaPresignTokenRequest{ExpirationTime: expirationMinutes} + + var out arcaPresignTokenResponse + if err := c.doJSON(http.MethodPost, path, body, headers, &out); err != nil { + return "", err + } + if out.Token == "" { + return "", fmt.Errorf("arca: empty presign token in response") + } + return c.baseURL + "/arca/api/v1/session/" + out.Token, nil +} + +// mergeLabelsWithEngine combines user-supplied labels in DeployConfig["labels"] +// with the reserved `engine=arca` label. The engine key always wins. +func mergeLabelsWithEngine(deployConfig map[string]interface{}) map[string]string { + out := map[string]string{engineLabelKey: engineLabelArca} + if deployConfig == nil { + return out + } + raw, ok := deployConfig["labels"] + if !ok || raw == nil { + return out + } + switch v := raw.(type) { + case map[string]string: + for k, val := range v { + if k == engineLabelKey { + continue + } + out[k] = val + } + case map[string]interface{}: + for k, val := range v { + if k == engineLabelKey { + continue + } + out[k] = fmt.Sprint(val) + } + } + return out +} diff --git a/api-service/service/arca_client_test.go b/api-service/service/arca_client_test.go new file mode 100644 index 0000000..552341a --- /dev/null +++ b/api-service/service/arca_client_test.go @@ -0,0 +1,699 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package service + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "api-service/models" + backend "envhub/models" +) + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +// arcaMock captures every request the ArcaClient makes, for assertions. +type arcaMock struct { + server *httptest.Server + requests []*capturedRequest + nextResponse func(req *http.Request) (status int, body string) +} + +type capturedRequest struct { + Method string + Path string + Headers http.Header + Body []byte +} + +func newArcaMock(t *testing.T, respond func(*http.Request) (int, string)) *arcaMock { + t.Helper() + m := &arcaMock{nextResponse: respond} + m.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + m.requests = append(m.requests, &capturedRequest{ + Method: r.Method, + Path: r.URL.Path, + Headers: r.Header.Clone(), + Body: body, + }) + status, resp := m.nextResponse(r) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = io.WriteString(w, resp) + })) + t.Cleanup(m.server.Close) + return m +} + +func (m *arcaMock) client() *ArcaClient { + return NewArcaClient(m.server.URL, "test-key") +} + +// okResponse wraps data into the standard Arca envelope. +func okResponse(dataJSON string) string { + return fmt.Sprintf(`{"success":true,"code":200,"message":"success","data":%s}`, dataJSON) +} + +// failResponse returns a success=false envelope. +func failResponse(code int, msg string) string { + return fmt.Sprintf(`{"success":false,"code":%d,"message":%q,"data":null}`, code, msg) +} + +// lastRequest returns the most recent captured request or fails the test. +func (m *arcaMock) lastRequest(t *testing.T) *capturedRequest { + t.Helper() + if len(m.requests) == 0 { + t.Fatal("no requests captured") + } + return m.requests[len(m.requests)-1] +} + +// decodeBody unmarshals the captured request body into a generic map. +func decodeBody(t *testing.T, raw []byte) map[string]interface{} { + t.Helper() + var out map[string]interface{} + if len(raw) == 0 { + return out + } + if err := json.Unmarshal(raw, &out); err != nil { + t.Fatalf("decode body: %v; raw=%s", err, raw) + } + return out +} + +// sampleEnv constructs a backend.Env with the provided DeployConfig overrides. +func sampleEnv(dc map[string]interface{}) *backend.Env { + if dc == nil { + dc = map[string]interface{}{} + } + // caller must explicitly set arcaTemplateId when needed + return &backend.Env{ + Name: "my-env", + Version: "1.0", + DeployConfig: dc, + } +} + +// --------------------------------------------------------------------------- +// skeleton-level tests (preserved from Task 1) +// --------------------------------------------------------------------------- + +// TestNewArcaClient_Defaults verifies constructor wiring. +// +// Supported engines: arca. +func TestNewArcaClient_Defaults(t *testing.T) { + c := NewArcaClient("http://example:8080", "test-key") + if c == nil { + t.Fatal("NewArcaClient returned nil") + } + if c.baseURL != "http://example:8080" { + t.Errorf("baseURL = %q, want http://example:8080", c.baseURL) + } + if c.apiKey != "test-key" { + t.Errorf("apiKey = %q, want test-key", c.apiKey) + } + if c.httpClient == nil { + t.Fatal("httpClient is nil") + } + if c.httpClient.Timeout != 30*time.Second { + t.Errorf("httpClient.Timeout = %v, want 30s", c.httpClient.Timeout) + } +} + +// TestNewArcaClient_TrimsTrailingSlash ensures we don't double-slash when +// users supply a trailing slash in --arca-base-url. +// +// Supported engines: arca. +func TestNewArcaClient_TrimsTrailingSlash(t *testing.T) { + c := NewArcaClient("http://example:8080/", "k") + if c.baseURL != "http://example:8080" { + t.Errorf("baseURL = %q, want http://example:8080", c.baseURL) + } +} + +// TestArcaClient_SatisfiesEnvInstanceService is a compile-time check via +// `var _ EnvInstanceService = (*ArcaClient)(nil)` in arca_client.go. +// +// Supported engines: arca. +func TestArcaClient_SatisfiesEnvInstanceService(t *testing.T) { + var _ EnvInstanceService = (*ArcaClient)(nil) +} + +// --------------------------------------------------------------------------- +// FC-API-01: Create happy path +// --------------------------------------------------------------------------- + +func TestArcaCreate_HappyPath(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-123"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "ttl": "30m", + }) + + inst, err := c.CreateEnvInstance(env) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst == nil || inst.ID != "sb-123" { + t.Fatalf("want sandbox id sb-123, got %+v", inst) + } + if inst.Status != models.EnvInstanceStatusPending.String() { + t.Errorf("status = %q, want Pending", inst.Status) + } + if inst.Labels["engine"] != "arca" { + t.Errorf("engine label = %q, want arca", inst.Labels["engine"]) + } + + req := m.lastRequest(t) + if req.Method != http.MethodPost { + t.Errorf("method = %s, want POST", req.Method) + } + if req.Path != "/arca/openapi/v1/sandbox/instances" { + t.Errorf("path = %s", req.Path) + } + if req.Headers.Get("x-agent-sandbox-template-id") != "tpl1" { + t.Errorf("x-agent-sandbox-template-id = %q, want tpl1", req.Headers.Get("x-agent-sandbox-template-id")) + } + + body := decodeBody(t, req.Body) + if body["template_id"] != "tpl1" { + t.Errorf("template_id = %v", body["template_id"]) + } + if fmt.Sprint(body["ttl_in_minutes"]) != "30" { + t.Errorf("ttl_in_minutes = %v", body["ttl_in_minutes"]) + } + if _, ok := body["resource"]; ok { + t.Errorf("body unexpectedly contains resource: %v", body["resource"]) + } + if _, ok := body["image"]; ok { + t.Errorf("body unexpectedly contains image: %v", body["image"]) + } +} + +// --------------------------------------------------------------------------- +// FC-API-02: missing arcaTemplateId +// --------------------------------------------------------------------------- + +func TestArcaCreate_MissingTemplateID_Returns400(t *testing.T) { + called := int32(0) + m := newArcaMock(t, func(r *http.Request) (int, string) { + atomic.AddInt32(&called, 1) + return http.StatusOK, okResponse(`{"sandbox_id":"sb-999"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{}) // no arcaTemplateId + inst, err := c.CreateEnvInstance(env) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "arcaTemplateId required for arca engine") { + t.Errorf("error = %q, want substring 'arcaTemplateId required for arca engine'", err) + } + if inst != nil { + t.Errorf("expected nil instance, got %+v", inst) + } + if atomic.LoadInt32(&called) != 0 { + t.Error("expected no HTTP call on validation failure") + } +} + +// --------------------------------------------------------------------------- +// FC-API-03: mount_points passthrough +// --------------------------------------------------------------------------- + +func TestArcaCreate_MountPointsPassthrough(t *testing.T) { + mp := []interface{}{ + map[string]interface{}{ + "id": "OSS_bucket_ak", + "remote_dir": "/data/oss", + "local_dir": "/workspace/oss", + }, + } + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-mp"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "mountPoints": mp, + }) + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + body := decodeBody(t, m.lastRequest(t).Body) + got, _ := json.Marshal(body["mount_points"]) + want, _ := json.Marshal(mp) + if string(got) != string(want) { + t.Errorf("mount_points = %s, want %s", got, want) + } +} + +// --------------------------------------------------------------------------- +// FC-API-05: TTL conversion (table-driven) +// --------------------------------------------------------------------------- + +func TestArcaCreate_TTLConversion(t *testing.T) { + cases := []struct { + raw string + wantMin int + wantOmit bool + wantErrSub string + }{ + {"30m", 30, false, ""}, + {"90s", 2, false, ""}, + {"1h", 60, false, ""}, + {"", 0, true, ""}, + {"bad", 0, false, "invalid ttl"}, + } + for _, tc := range cases { + t.Run("ttl="+tc.raw, func(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-ttl"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "ttl": tc.raw, + }) + _, err := c.CreateEnvInstance(env) + if tc.wantErrSub != "" { + if err == nil || !strings.Contains(err.Error(), tc.wantErrSub) { + t.Fatalf("err = %v, want substring %q", err, tc.wantErrSub) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + body := decodeBody(t, m.lastRequest(t).Body) + if tc.wantOmit { + if _, ok := body["ttl_in_minutes"]; ok { + t.Errorf("ttl_in_minutes should be omitted, got %v", body["ttl_in_minutes"]) + } + return + } + got := fmt.Sprint(body["ttl_in_minutes"]) + want := fmt.Sprint(tc.wantMin) + if got != want { + t.Errorf("ttl_in_minutes = %s, want %s", got, want) + } + }) + } +} + +// --------------------------------------------------------------------------- +// FC-API-06: metadata injection +// --------------------------------------------------------------------------- + +func TestArcaCreate_MetadataInjected(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-md"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "owner": "alice", + }) + env.Name = "my-env" + env.Version = "1.0" + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + body := decodeBody(t, m.lastRequest(t).Body) + md, ok := body["metadata"].(map[string]interface{}) + if !ok { + t.Fatalf("metadata = %v, want map", body["metadata"]) + } + if md["aenv_env_name"] != "my-env" { + t.Errorf("aenv_env_name = %v", md["aenv_env_name"]) + } + if md["aenv_env_version"] != "1.0" { + t.Errorf("aenv_env_version = %v", md["aenv_env_version"]) + } + if md["aenv_owner"] != "alice" { + t.Errorf("aenv_owner = %v", md["aenv_owner"]) + } +} + +// --------------------------------------------------------------------------- +// FC-API-07/08: no resource, no image in body (spec forbids) +// --------------------------------------------------------------------------- + +func TestArcaCreate_NoResourceOrImageInBody(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-nores"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "cpu": "2", + "memory": "4", + "disk": "25", + }) + env.Artifacts = []backend.Artifact{{Type: "docker-image", Content: "irrelevant"}} + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + raw := string(m.lastRequest(t).Body) + if strings.Contains(raw, `"resource"`) { + t.Errorf("body contains resource field: %s", raw) + } + if strings.Contains(raw, `"image"`) { + t.Errorf("body contains image field: %s", raw) + } +} + +// --------------------------------------------------------------------------- +// FC-API-09: Get status mapping +// --------------------------------------------------------------------------- + +func TestArcaGet_StatusMapping(t *testing.T) { + cases := []struct { + arca string + want string + }{ + {"PENDING", models.EnvInstanceStatusPending.String()}, + {"RUNNING", models.EnvInstanceStatusRunning.String()}, + {"FAILED", models.EnvInstanceStatusFailed.String()}, + {"DESTROYED", models.EnvInstanceStatusTerminated.String()}, + {"PAUSED", models.EnvInstanceStatusFailed.String()}, + {"UNKNOWN_VALUE", models.EnvInstanceStatusFailed.String()}, // default branch + } + for _, tc := range cases { + t.Run(tc.arca, func(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(fmt.Sprintf(`{"sandbox_id":"sb-1","status":%q}`, tc.arca)) + }) + c := m.client() + inst, err := c.GetEnvInstance("sb-1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst.Status != tc.want { + t.Errorf("status = %q, want %q", inst.Status, tc.want) + } + if inst.Labels["engine"] != "arca" { + t.Errorf("engine label missing") + } + }) + } +} + +// --------------------------------------------------------------------------- +// FC-API-10: Get populates pod_ip (Arca emits `podIp` in camelCase) +// --------------------------------------------------------------------------- + +func TestArcaGet_PodIPPopulated(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-ip","status":"RUNNING","podIp":"10.1.2.3"}`) + }) + c := m.client() + inst, err := c.GetEnvInstance("sb-ip") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst.IP != "10.1.2.3" { + t.Errorf("IP = %q, want 10.1.2.3", inst.IP) + } +} + +// --------------------------------------------------------------------------- +// FC-API-11: Delete success +// --------------------------------------------------------------------------- + +func TestArcaDelete_Success(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse("1") + }) + c := m.client() + + if err := c.DeleteEnvInstance("sb-del"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + req := m.lastRequest(t) + if req.Method != http.MethodDelete { + t.Errorf("method = %s", req.Method) + } + if !strings.HasSuffix(req.Path, "/sb-del") { + t.Errorf("path = %s", req.Path) + } + if req.Headers.Get("x-agent-sandbox-id") != "sb-del" { + t.Errorf("x-agent-sandbox-id = %q", req.Headers.Get("x-agent-sandbox-id")) + } +} + +// --------------------------------------------------------------------------- +// FC-API-12: Delete 404 returns error, no panic +// --------------------------------------------------------------------------- + +func TestArcaDelete_NotFound_ReturnsError(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusNotFound, `{"success":false,"code":404,"message":"sandbox not found","data":null}` + }) + c := m.client() + + err := c.DeleteEnvInstance("missing") + if err == nil { + t.Fatal("expected error") + } +} + +// --------------------------------------------------------------------------- +// FC-API-13: List returns not-supported +// --------------------------------------------------------------------------- + +func TestArcaList_ReturnsNotSupported(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + t.Fatalf("list should not hit Arca: %s %s", r.Method, r.URL.Path) + return 0, "" + }) + c := m.client() + + instances, err := c.ListEnvInstances("") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "not supported") { + t.Errorf("error = %q, want substring 'not supported'", err) + } + if instances != nil { + t.Errorf("expected nil slice, got %v", instances) + } +} + +// --------------------------------------------------------------------------- +// FC-API-15: Warmup not supported +// --------------------------------------------------------------------------- + +func TestArcaWarmup_NotSupported(t *testing.T) { + c := NewArcaClient("http://127.0.0.1:1", "k") + err := c.Warmup(nil) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "not supported") { + t.Errorf("error = %q, want 'not supported'", err) + } +} + +// --------------------------------------------------------------------------- +// FC-API-16: Arca 5xx passthrough +// --------------------------------------------------------------------------- + +func TestArcaCreate_Arca500_PassthroughError(t *testing.T) { + count := int32(0) + m := newArcaMock(t, func(r *http.Request) (int, string) { + atomic.AddInt32(&count, 1) + return http.StatusInternalServerError, failResponse(500, "boom") + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{"arcaTemplateId": "tpl1"}) + _, err := c.CreateEnvInstance(env) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "500") { + t.Errorf("error = %q, want contains 500", err) + } + if atomic.LoadInt32(&count) != 1 { + t.Errorf("requests = %d, want exactly 1 (no retry)", atomic.LoadInt32(&count)) + } +} + +// --------------------------------------------------------------------------- +// FC-API-17: Create timeout +// --------------------------------------------------------------------------- + +func TestArcaCreate_Timeout(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + time.Sleep(300 * time.Millisecond) + return http.StatusOK, okResponse(`{"sandbox_id":"sb-slow"}`) + }) + c := m.client() + c.httpClient.Timeout = 50 * time.Millisecond + + env := sampleEnv(map[string]interface{}{"arcaTemplateId": "tpl1"}) + _, err := c.CreateEnvInstance(env) + if err == nil { + t.Fatal("expected timeout error") + } + msg := err.Error() + if !strings.Contains(msg, "Timeout") && !strings.Contains(msg, "deadline") { + t.Errorf("error = %q, want timeout/deadline", err) + } +} + +// --------------------------------------------------------------------------- +// FC-API-18: API key header present on every call +// --------------------------------------------------------------------------- + +func TestArca_APIKeyHeader(t *testing.T) { + seenKeys := []string{} + m := newArcaMock(t, func(r *http.Request) (int, string) { + seenKeys = append(seenKeys, r.Header.Get("x-agent-sandbox-api-key")) + switch { + case r.Method == http.MethodPost: + return http.StatusOK, okResponse(`{"sandbox_id":"sb-hdr"}`) + case r.Method == http.MethodGet: + return http.StatusOK, okResponse(`{"sandbox_id":"sb-hdr","status":"RUNNING"}`) + case r.Method == http.MethodDelete: + return http.StatusOK, okResponse("1") + } + return http.StatusNotFound, "" + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{"arcaTemplateId": "tpl1"}) + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("create: %v", err) + } + if _, err := c.GetEnvInstance("sb-hdr"); err != nil { + t.Fatalf("get: %v", err) + } + if err := c.DeleteEnvInstance("sb-hdr"); err != nil { + t.Fatalf("delete: %v", err) + } + if len(seenKeys) != 3 { + t.Fatalf("expected 3 keys captured, got %d", len(seenKeys)) + } + for i, k := range seenKeys { + if k != "test-key" { + t.Errorf("request #%d key = %q, want test-key", i, k) + } + } +} + +// --------------------------------------------------------------------------- +// engine label merge preserves user labels +// --------------------------------------------------------------------------- + +func TestArcaCreate_MergesUserLabels(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-lab"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "labels": map[string]string{ + "owner": "alice", + "engine": "ignored-by-client", // engine must always be arca + }, + }) + inst, err := c.CreateEnvInstance(env) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst.Labels["engine"] != "arca" { + t.Errorf("engine label = %q, want arca", inst.Labels["engine"]) + } + if inst.Labels["owner"] != "alice" { + t.Errorf("owner label = %q, want alice", inst.Labels["owner"]) + } +} + +// --------------------------------------------------------------------------- +// FC-API-19: PresignURL happy path + error envelope +// --------------------------------------------------------------------------- + +func TestArcaPresignURL_HappyPath(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + // Verify path / headers / body shape. + if !strings.HasSuffix(r.URL.Path, "/arca/api/v1/sandbox/sb-1/presign/token") { + t.Errorf("path = %q", r.URL.Path) + } + if r.Header.Get("x-agent-sandbox-id") != "sb-1" { + t.Errorf("missing x-agent-sandbox-id header") + } + if r.Header.Get("x-agent-sandbox-port") != "8080" { + t.Errorf("port header = %q, want 8080", r.Header.Get("x-agent-sandbox-port")) + } + return http.StatusOK, okResponse(`{"token":"abc123"}`) + }) + c := m.client() + + url, err := c.PresignURL("sb-1", 8080, 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + wantSuffix := "/arca/api/v1/session/abc123" + if !strings.HasSuffix(url, wantSuffix) { + t.Errorf("url = %q, want suffix %q", url, wantSuffix) + } +} + +func TestArcaPresignURL_EmptyToken(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"token":""}`) + }) + c := m.client() + if _, err := c.PresignURL("sb-1", 8080, 5); err == nil { + t.Errorf("expected error on empty token") + } +} + +func TestArcaPresignURL_RejectsZeroPort(t *testing.T) { + c := (&ArcaClient{}).withDummy() + if _, err := c.PresignURL("sb-1", 0, 5); err == nil { + t.Errorf("expected error on port=0") + } +} + +// withDummy makes the receiver usable without a server (input-validation tests). +func (c *ArcaClient) withDummy() *ArcaClient { + c.baseURL = "http://example.invalid" + c.apiKey = "k" + c.httpClient = &http.Client{} + return c +} diff --git a/api-service/service/cleanup_service_test.go b/api-service/service/cleanup_service_test.go index cdd94db..b07acf7 100644 --- a/api-service/service/cleanup_service_test.go +++ b/api-service/service/cleanup_service_test.go @@ -62,6 +62,10 @@ func (m *MockEnvInstanceService) Warmup(req *backend.Env) error { return nil } +func (m *MockEnvInstanceService) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", nil +} + // TestPerformCleanupNoInstances tests cleanup when there are no env instances func TestPerformCleanupNoInstances(t *testing.T) { // Create mock service that returns empty list diff --git a/api-service/service/env_instance.go b/api-service/service/env_instance.go index 7a893b5..9e2e173 100644 --- a/api-service/service/env_instance.go +++ b/api-service/service/env_instance.go @@ -24,6 +24,11 @@ type EnvInstanceService interface { DeleteEnvInstance(id string) error ListEnvInstances(envName string) ([]*models.EnvInstance, error) Warmup(req *backend.Env) error + // PresignURL returns a short-lived URL that gives direct access to a + // port inside the sandbox. Implementations that don't support presigning + // (k8s/standard/faas) must return an error whose message starts with + // "not supported on this engine". + PresignURL(id string, port int, expirationMinutes float64) (string, error) } type EnvInstanceClient struct { @@ -303,6 +308,12 @@ func (c *EnvInstanceClient) Warmup(req *backend.Env) error { return nil } +// PresignURL is unsupported on the standard engine. +// Supported engines: arca (returns error on standard). +func (c *EnvInstanceClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", fmt.Errorf("not supported on this engine (standard): presign URL") +} + // truncateBody truncate body for memory protection func truncateBody(body []byte) string { const maxLen = 500 diff --git a/api-service/service/faas_client.go b/api-service/service/faas_client.go index d0acc05..0f657d9 100644 --- a/api-service/service/faas_client.go +++ b/api-service/service/faas_client.go @@ -426,3 +426,9 @@ func convertStatus(s faas_model.InstanceStatus) string { return models.EnvInstanceStatusRunning.String() } } + +// PresignURL is unsupported on the faas engine. +// Supported engines: arca (returns error on faas). +func (c *FaaSClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", fmt.Errorf("not supported on this engine (faas): presign URL") +} diff --git a/api-service/service/schedule_client.go b/api-service/service/schedule_client.go index 63e20b5..42a35cb 100644 --- a/api-service/service/schedule_client.go +++ b/api-service/service/schedule_client.go @@ -607,3 +607,9 @@ func (c *ScheduleClient) ListEnvInstances(envName string) ([]*models.EnvInstance func (c *ScheduleClient) Warmup(req *backend.Env) error { return fmt.Errorf("warmup is not implemented") } + +// PresignURL is unsupported on the k8s engine. +// Supported engines: arca (returns error on k8s). +func (c *ScheduleClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", fmt.Errorf("not supported on this engine (k8s): presign URL") +} diff --git a/envhub/service/env_redis.go b/envhub/service/env_redis.go index 878f0f9..1c6ec97 100644 --- a/envhub/service/env_redis.go +++ b/envhub/service/env_redis.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "sort" + "sync" "time" redis "github.com/go-redis/redis/v8" @@ -46,6 +47,13 @@ type RedisEnvStorage struct { client *redis.Client keyPrefix string indexKey string + + keyLocks sync.Map +} + +func (s *RedisEnvStorage) keyLock(key string) *sync.Mutex { + v, _ := s.keyLocks.LoadOrStore(key, &sync.Mutex{}) + return v.(*sync.Mutex) } var _ EnvStorage = (*RedisEnvStorage)(nil) @@ -128,57 +136,60 @@ func (s *RedisEnvStorage) Create(ctx context.Context, key string, env *models.En return nil } -// Update updates Env object +// Update applies an optimistic check-and-set update on the env record. +// Returns ErrEnvNotFound if the key is absent and a version mismatch error +// if the on-disk record was concurrently modified. func (s *RedisEnvStorage) Update(ctx context.Context, key string, env *models.Env, resourceVersion int64, labels map[string]string) error { - redisKey := s.dataKey(key) - return s.client.Watch(ctx, func(tx *redis.Tx) error { - payload, err := tx.Get(ctx, redisKey).Bytes() - if errors.Is(err, redis.Nil) { - return fmt.Errorf("%w: %s", ErrEnvNotFound, key) - } - if err != nil { - return fmt.Errorf("failed to read env %s: %w", key, err) - } + mu := s.keyLock(key) + mu.Lock() + defer mu.Unlock() - var record redisEnvRecord - if err := json.Unmarshal(payload, &record); err != nil { - return fmt.Errorf("failed to unmarshal env %s: %w", key, err) - } + redisKey := s.dataKey(key) - if record.ResourceVersion != resourceVersion { - return fmt.Errorf("resource version mismatch for %s: expect %d got %d", key, record.ResourceVersion, resourceVersion) - } + current, err := s.loadRecord(ctx, key) + if err != nil { + return err + } + if current.ResourceVersion != resourceVersion { + return fmt.Errorf("resource version mismatch for %s: expect %d got %d", key, current.ResourceVersion, resourceVersion) + } - record.Env = env - if labels != nil { - record.Labels = copyLabels(labels) - } - record.ResourceVersion++ - record.LastUpdatedEpoch = time.Now().Unix() + updated := redisEnvRecord{ + Env: env, + Labels: current.Labels, + ResourceVersion: current.ResourceVersion + 1, + LastUpdatedEpoch: time.Now().Unix(), + } + if labels != nil { + updated.Labels = copyLabels(labels) + } - updatedPayload, err := json.Marshal(record) - if err != nil { - return fmt.Errorf("failed to marshal updated env %s: %w", key, err) - } + payload, err := json.Marshal(updated) + if err != nil { + return fmt.Errorf("failed to marshal updated env %s: %w", key, err) + } - _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { - p.Set(ctx, redisKey, updatedPayload, 0) - p.SAdd(ctx, s.indexKey, key) - return nil - }) - return err - }, redisKey) + if err := s.client.Set(ctx, redisKey, payload, 0).Err(); err != nil { + return fmt.Errorf("failed to write env %s: %w", key, err) + } + if err := s.client.SAdd(ctx, s.indexKey, key).Err(); err != nil { + return fmt.Errorf("failed to update index for env %s: %w", key, err) + } + return nil } -// Delete deletes Env object +// Delete removes the env record and its index entry. func (s *RedisEnvStorage) Delete(ctx context.Context, key string) error { - redisKey := s.dataKey(key) - pipe := s.client.TxPipeline() - pipe.Del(ctx, redisKey) - pipe.SRem(ctx, s.indexKey, key) - if _, err := pipe.Exec(ctx); err != nil { + mu := s.keyLock(key) + mu.Lock() + defer mu.Unlock() + + if err := s.client.Del(ctx, s.dataKey(key)).Err(); err != nil { return fmt.Errorf("failed to delete env %s: %w", key, err) } + if err := s.client.SRem(ctx, s.indexKey, key).Err(); err != nil { + return fmt.Errorf("failed to remove env %s from index: %w", key, err) + } return nil }