From 13113a9b750379d794ca21c19d342a374b91c9ba Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 29 Apr 2026 14:38:00 +0800 Subject: [PATCH 01/11] feat(api-service): add arca sandbox engine - ArcaClient implements Create/Get/Delete against Arca OpenAPI /arca/openapi/v1/sandbox/*, with DeployConfig keys arcaTemplateId and mountPoints. List/Warmup return clear not-supported errors. - PresignURL method on EnvInstanceService interface; ArcaClient calls /arca/api/v1/sandbox/{id}/presign/token; other engines return a clear "not supported on this engine" error. - POST /env-instance/:id/presign-url endpoint forwards to the service layer so the SDK stays engine-unaware. - MCPGateway gets engine type from --schedule-type at startup. Under arca it resolves the target from the configured gateway base URL plus the AEnvCore-EnvInstance-ID header (SDK-provided MCPProxy-URL is ignored), and injects x-agent-sandbox-api-key / x-agent-sandbox-id on each forwarded request. - Controller CreateEnvInstanceRequest adds mount_points passthrough for arca; no init_command on envhub/api-service surface. Co-Authored-By: Claude Opus 4.7 --- .gitignore | 5 + api-service/controller/env_instance.go | 42 ++ .../controller/env_instance_arca_test.go | 109 +++ api-service/controller/mcp_proxy.go | 198 ++++- api-service/main.go | 31 +- api-service/service/arca_client.go | 464 ++++++++++++ api-service/service/arca_client_test.go | 699 ++++++++++++++++++ api-service/service/cleanup_service_test.go | 4 + api-service/service/env_instance.go | 11 + api-service/service/faas_client.go | 6 + api-service/service/schedule_client.go | 6 + 11 files changed, 1571 insertions(+), 4 deletions(-) create mode 100644 api-service/controller/env_instance_arca_test.go create mode 100644 api-service/service/arca_client.go create mode 100644 api-service/service/arca_client_test.go diff --git a/.gitignore b/.gitignore index 671c46d5..8d20b64a 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,8 @@ output/ ## local build .cache .bash_history + +# arca local-only artifacts +deploy/api-service-arca.yaml +docs/superpowers/specs/2026-04-24-arca-sandbox-engine-design.md +docs/superpowers/plans/2026-04-27-arca-sandbox-engine-impl.md diff --git a/api-service/controller/env_instance.go b/api-service/controller/env_instance.go index 29d3f8da..d6a00498 100644 --- a/api-service/controller/env_instance.go +++ b/api-service/controller/env_instance.go @@ -57,6 +57,9 @@ type CreateEnvInstanceRequest struct { TTL string `json:"ttl"` Owner string `json:"owner"` Labels map[string]string `json:"labels,omitempty"` + // MountPoints forwards Arca-style mount entries to the backend. + // Supported engines: arca (ignored on k8s/standard/faas). + MountPoints []map[string]interface{} `json:"mount_points,omitempty"` } // CreateEnvInstance creates a new EnvInstance @@ -120,6 +123,10 @@ func (ctrl *EnvInstanceController) CreateEnvInstance(c *gin.Context) { if req.Labels != nil { backendEnv.DeployConfig["labels"] = req.Labels } + // Arca-specific passthrough. Supported engines: arca. + if len(req.MountPoints) > 0 { + backendEnv.DeployConfig["mountPoints"] = req.MountPoints + } // Call ScheduleClient to create Pod envInstance, err := ctrl.envInstanceService.CreateEnvInstance(backendEnv) if err != nil { @@ -196,6 +203,41 @@ func (ctrl *EnvInstanceController) DeleteEnvInstance(c *gin.Context) { } } +// PresignURLRequest is the body for POST /env-instance/:id/presign-url. +type PresignURLRequest struct { + Port int `json:"port" binding:"required"` + ExpirationTimeInMinutes float64 `json:"expiration_time_in_minutes,omitempty"` +} + +// PresignURLResponse is the body returned from PresignURL. +type PresignURLResponse struct { + URL string `json:"url"` +} + +// PresignURL returns a short-lived URL pointing to a port inside the sandbox. +// Engines that don't support presigning surface a clear error from the +// service layer; the SDK is engine-unaware. +// +// POST /env-instance/:id/presign-url +func (ctrl *EnvInstanceController) PresignURL(c *gin.Context) { + id := c.Param("id") + if id == "" { + backendmodels.JSONErrorWithMessage(c, 400, "Missing id parameter") + return + } + var req PresignURLRequest + if err := c.ShouldBindJSON(&req); err != nil { + backendmodels.JSONErrorWithMessage(c, 400, "Invalid request: "+err.Error()) + return + } + url, err := ctrl.envInstanceService.PresignURL(id, req.Port, req.ExpirationTimeInMinutes) + if err != nil { + backendmodels.JSONErrorWithMessage(c, 501, err.Error()) + return + } + backendmodels.JSONSuccess(c, PresignURLResponse{URL: url}) +} + func (ctrl *EnvInstanceController) ListEnvInstances(c *gin.Context) { token := util.GetCurrentToken(c) if token == nil { diff --git a/api-service/controller/env_instance_arca_test.go b/api-service/controller/env_instance_arca_test.go new file mode 100644 index 00000000..ec8a2878 --- /dev/null +++ b/api-service/controller/env_instance_arca_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package controller + +import ( + "bytes" + "encoding/json" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" +) + +// TestCreateEnvInstanceRequest_ArcaFieldsBinding verifies JSON binding of +// mount_points. +// +// Supported engines: arca. +func TestCreateEnvInstanceRequest_ArcaFieldsBinding(t *testing.T) { + body := `{ + "envName": "test@v1", + "ttl": "30m", + "mount_points": [ + {"id": "OSS_bucket_ak", "remote_dir": "/data/oss", "local_dir": "/workspace/oss"} + ] + }` + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/env-instance", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + var req CreateEnvInstanceRequest + if err := c.ShouldBindJSON(&req); err != nil { + t.Fatalf("bind: %v", err) + } + + if len(req.MountPoints) != 1 { + t.Fatalf("mount_points length = %d, want 1", len(req.MountPoints)) + } + mp := req.MountPoints[0] + if mp["id"] != "OSS_bucket_ak" { + t.Errorf("mount_points[0].id = %v", mp["id"]) + } + if mp["remote_dir"] != "/data/oss" { + t.Errorf("mount_points[0].remote_dir = %v", mp["remote_dir"]) + } +} + +// TestCreateEnvInstance_MountPointsForwardedToDeployConfig mirrors the +// controller's forwarding step without invoking the HTTP handler. +// +// Supported engines: arca. +func TestCreateEnvInstance_MountPointsForwardedToDeployConfig(t *testing.T) { + reqBody := CreateEnvInstanceRequest{ + EnvName: "test@v1", + MountPoints: []map[string]interface{}{ + {"id": "OSS_x", "remote_dir": "/a", "local_dir": "/b"}, + }, + } + deployConfig := make(map[string]interface{}) + if len(reqBody.MountPoints) > 0 { + deployConfig["mountPoints"] = reqBody.MountPoints + } + got, ok := deployConfig["mountPoints"].([]map[string]interface{}) + if !ok { + t.Fatalf("DeployConfig[mountPoints] type = %T, want []map[string]interface{}", deployConfig["mountPoints"]) + } + if len(got) != 1 || got[0]["id"] != "OSS_x" { + t.Errorf("unexpected mountPoints: %v", got) + } +} + +// TestCreateEnvInstance_ArcaFieldsOmitted_NoDeployConfigMutation verifies +// backward compatibility: existing callers (k8s/standard/faas) that don't +// supply arca fields see no extra keys injected. +// +// Supported engines: all. +func TestCreateEnvInstance_ArcaFieldsOmitted_NoDeployConfigMutation(t *testing.T) { + reqBody := CreateEnvInstanceRequest{EnvName: "test@v1", TTL: "30m"} + deployConfig := make(map[string]interface{}) + if len(reqBody.MountPoints) > 0 { + deployConfig["mountPoints"] = reqBody.MountPoints + } + if _, ok := deployConfig["mountPoints"]; ok { + t.Errorf("mountPoints should not be set when request omits it") + } +} + +// TestCreateEnvInstanceRequest_ArcaFieldsOmitted_UnmarshalSilent verifies +// empty request body does not populate arca fields. +func TestCreateEnvInstanceRequest_ArcaFieldsOmitted_UnmarshalSilent(t *testing.T) { + body := `{"envName": "test@v1", "ttl": "30m"}` + var req CreateEnvInstanceRequest + if err := json.Unmarshal([]byte(body), &req); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if req.MountPoints != nil { + t.Errorf("mount_points = %v, want nil", req.MountPoints) + } +} diff --git a/api-service/controller/mcp_proxy.go b/api-service/controller/mcp_proxy.go index 46041a90..dc1dc4ab 100644 --- a/api-service/controller/mcp_proxy.go +++ b/api-service/controller/mcp_proxy.go @@ -18,10 +18,12 @@ package controller import ( "api-service/constants" + "fmt" "io" "net/http" "net/http/httputil" "net/url" + "strings" "time" "github.com/gin-gonic/gin" @@ -48,16 +50,33 @@ const ( // HTTP methods MethodGET = "GET" MethodPOST = "POST" + + // Schedule types (kept in sync with main.go --schedule-type). + scheduleTypeArca = "arca" + + // Arca gateway conventions (kept in sync with service/arca_client.go). + arcaGatewaySandboxPrefix = "/arca/api/v1/sandbox" + arcaHeaderAPIKey = "x-agent-sandbox-api-key" + arcaHeaderSandboxID = "x-agent-sandbox-id" ) +// MCPGatewayConfig is injected at api-service startup so the gateway never +// has to query per-request engine metadata. +type MCPGatewayConfig struct { + ScheduleType string // "k8s" | "standard" | "faas" | "arca" + ArcaBaseURL string + ArcaAPIKey string +} + // MCPGateway MCP gateway struct type MCPGateway struct { router *gin.RouterGroup transport *http.Transport + config MCPGatewayConfig } // NewMCPGateway creates a new MCP gateway instance -func NewMCPGateway(router *gin.RouterGroup) *MCPGateway { +func NewMCPGateway(router *gin.RouterGroup, cfg MCPGatewayConfig) *MCPGateway { gateway := &MCPGateway{ router: router, transport: &http.Transport{ @@ -65,6 +84,7 @@ func NewMCPGateway(router *gin.RouterGroup) *MCPGateway { MaxIdleConnsPerHost: 10, IdleConnTimeout: 90 * time.Second, }, + config: cfg, } gateway.setupRoutes() @@ -77,8 +97,24 @@ func (g *MCPGateway) setupRoutes() { } func (g *MCPGateway) innerRouter(c *gin.Context) { - proxyURL, _ := g.getMCPSeverURL(c) path := c.Param("path") + if g.config.ScheduleType == scheduleTypeArca { + // Arca engine: the gateway doesn't trust SDK-provided proxy URL. + // Target is derived from startup config + sandbox id header. + if path == PathHealth { + g.healthCheck(c) + return + } + switch path { + case PathSSE: + g.handleArcaSSE(c) + default: + g.handleArcaHTTP(c) + } + return + } + + proxyURL, _ := g.getMCPSeverURL(c) if proxyURL != "" { switch path { case PathSSE: @@ -309,6 +345,164 @@ func (g *MCPGateway) handleMCPHTTPWithHeader(c *gin.Context) { proxy.ServeHTTP(c.Writer, c.Request) } +// arcaTargetURL resolves the target URL for an arca request. The caller's +// path is preserved after the {arca gateway prefix}/{sandbox_id} base, which +// matches what arca-sandbox SDK speaks natively. +// +// Supported engines: arca. +func (g *MCPGateway) arcaTargetURL(c *gin.Context) (*url.URL, string, error) { + sandboxID := c.GetHeader(constants.HeaderEnvInstanceID) + if sandboxID == "" { + return nil, "", &MCPError{ + Code: http.StatusBadRequest, + Message: constants.HeaderEnvInstanceID + " header is required for arca engine", + } + } + if g.config.ArcaBaseURL == "" { + return nil, "", &MCPError{ + Code: http.StatusInternalServerError, + Message: "arca base URL is not configured on api-service", + } + } + base, err := url.Parse(g.config.ArcaBaseURL) + if err != nil { + return nil, "", &MCPError{ + Code: http.StatusInternalServerError, + Message: "invalid arca base URL", + Details: err.Error(), + } + } + tail := strings.TrimPrefix(c.Request.URL.Path, "/") + base.Path = fmt.Sprintf("%s/%s", arcaGatewaySandboxPrefix, sandboxID) + if tail != "" { + base.Path = base.Path + "/" + tail + } + base.RawQuery = c.Request.URL.RawQuery + return base, sandboxID, nil +} + +// handleArcaHTTP reverse-proxies non-SSE MCP traffic to the arca gateway. +// Target is derived from startup config and X-Instance-ID, not any SDK-provided URL. +// +// Supported engines: arca. +func (g *MCPGateway) handleArcaHTTP(c *gin.Context) { + targetURL, sandboxID, err := g.arcaTargetURL(c) + if err != nil { + if mcpErr, ok := err.(*MCPError); ok { + c.JSON(mcpErr.Code, gin.H{"error": mcpErr.Message}) + return + } + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + proxy := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL.Scheme = targetURL.Scheme + req.URL.Host = targetURL.Host + req.URL.Path = targetURL.Path + req.URL.RawQuery = targetURL.RawQuery + req.Host = targetURL.Host + req.Header.Del(constants.HeaderMCPServerURL) + req.Header.Set(arcaHeaderAPIKey, g.config.ArcaAPIKey) + req.Header.Set(arcaHeaderSandboxID, sandboxID) + }, + Transport: g.transport, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { + log.Errorf("arca proxy error for sandbox %s: %v", sandboxID, err) + c.JSON(http.StatusBadGateway, gin.H{ + "error": "Failed to forward request to arca gateway", + "details": err.Error(), + "sandbox": sandboxID, + }) + }, + } + proxy.ServeHTTP(c.Writer, c.Request) +} + +// handleArcaSSE streams SSE responses from the arca gateway back to the caller. +// +// Supported engines: arca. +func (g *MCPGateway) handleArcaSSE(c *gin.Context) { + targetURL, sandboxID, err := g.arcaTargetURL(c) + if err != nil { + if mcpErr, ok := err.(*MCPError); ok { + c.JSON(mcpErr.Code, gin.H{"error": mcpErr.Message}) + return + } + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + c.Header(HeaderContentType, ContentTypeSSE) + c.Header(HeaderCacheControl, "no-cache") + c.Header(HeaderConnection, "keep-alive") + c.Header("Access-Control-Allow-Origin", "*") + + req, err := http.NewRequest(MethodGET, targetURL.String(), nil) + if err != nil { + log.Errorf("arca SSE: failed to create request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create request"}) + return + } + req.Header.Set(HeaderAccept, ContentTypeSSE) + req.Header.Set(HeaderCacheControl, "no-cache") + g.copyHeadersExcept(c.Request.Header, req.Header, + constants.HeaderMCPServerURL, + arcaHeaderAPIKey, + arcaHeaderSandboxID, + ) + req.Header.Set(arcaHeaderAPIKey, g.config.ArcaAPIKey) + req.Header.Set(arcaHeaderSandboxID, sandboxID) + + client := &http.Client{Transport: g.transport} + resp, err := client.Do(req) + if err != nil { + log.Errorf("arca SSE: connect failed sandbox=%s: %v", sandboxID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Failed to connect to arca gateway"}) + return + } + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + log.Warnf("failed to close arca SSE response body: %v", closeErr) + } + }() + + if resp.StatusCode != http.StatusOK { + log.Warnf("arca SSE: upstream status %d sandbox=%s", resp.StatusCode, sandboxID) + c.JSON(resp.StatusCode, gin.H{"error": "arca gateway error"}) + return + } + + for name, values := range resp.Header { + if name != HeaderContentType { + for _, value := range values { + c.Header(name, value) + } + } + } + c.Header(HeaderContentType, ContentTypeSSE) + + buf := make([]byte, 4096) + for { + n, err := resp.Body.Read(buf) + if err != nil { + if err != io.EOF { + log.Errorf("arca SSE read error sandbox=%s: %v", sandboxID, err) + } + break + } + if n > 0 { + if _, writeErr := c.Writer.Write(buf[:n]); writeErr != nil { + log.Errorf("arca SSE write error: %v", writeErr) + break + } + if flusher, ok := c.Writer.(http.Flusher); ok { + flusher.Flush() + } + } + } +} + // GetRouter gets router instance func (g *MCPGateway) GetRouter() *gin.RouterGroup { return g.router diff --git a/api-service/main.go b/api-service/main.go index d8862a0a..73623f35 100644 --- a/api-service/main.go +++ b/api-service/main.go @@ -20,6 +20,7 @@ package main import ( "math/rand" "net/http" + "os" "runtime" "time" @@ -48,11 +49,15 @@ var ( tokenCacheMaxEntries int tokenCacheTTLMinutes int cleanupInterval string + // Supported engines: arca. + arcaBaseURL string + // Supported engines: arca. + arcaAPIKey string ) func init() { pflag.StringVar(&scheduleAddr, "schedule-addr", "", "Meta service address (host:port)") - pflag.StringVar(&scheduleType, "schedule-type", "k8s", "sandbox service schedule type, currently only 'k8s', 'standard' support") + pflag.StringVar(&scheduleType, "schedule-type", "k8s", "sandbox service schedule type: 'k8s', 'standard', 'faas', or 'arca'") pflag.StringVar(&backendAddr, "backend-addr", "", "backend service address (host:port)") pflag.Int64Var(&qps, "qps", int64(100), "total qps limit") @@ -63,6 +68,10 @@ func init() { pflag.StringVar(&redisAddr, "redis-addr", "", "Redis address (host:port)") pflag.StringVar(&redisPassword, "redis-password", "", "Redis password") pflag.StringVar(&cleanupInterval, "cleanup-interval", "5m", "Cleanup service interval (e.g., 5m, 1h)") + + // Arca sandbox engine flags. Supported engines: arca. + pflag.StringVar(&arcaBaseURL, "arca-base-url", "", "Arca sandbox OpenAPI base URL. Supported engines: arca") + pflag.StringVar(&arcaAPIKey, "arca-api-key", "", "Arca sandbox OpenAPI key; falls back to ARCA_API_KEY env. Supported engines: arca") } func healthChecker(c *gin.Context) { @@ -104,6 +113,19 @@ func main() { scheduleClient = service.NewEnvInstanceClient(scheduleAddr) case "faas": scheduleClient = service.NewFaaSClient(scheduleAddr) + case "arca": + // Supported engines: arca. + key := arcaAPIKey + if key == "" { + key = os.Getenv("ARCA_API_KEY") + } + if arcaBaseURL == "" { + log.Fatalf("--arca-base-url is required when --schedule-type=arca") + } + if key == "" { + log.Fatalf("arca API key missing: set --arca-api-key or ARCA_API_KEY") + } + scheduleClient = service.NewArcaClient(arcaBaseURL, key) default: log.Fatalf("unsupported schedule type: %v", scheduleType) } @@ -119,6 +141,7 @@ func main() { mainRouter.GET("/env-instance/:id/list", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.ListEnvInstances) mainRouter.GET("/env-instance/:id", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.GetEnvInstance) mainRouter.DELETE("/env-instance/:id", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.DeleteEnvInstance) + mainRouter.POST("/env-instance/:id/presign-url", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.PresignURL) // Service routes if envServiceController != nil { @@ -144,7 +167,11 @@ func main() { mcpRouter.Use(middleware.MCPMetricsMiddleware()) mcpRouter.Use(middleware.LoggingMiddleware()) mcpGroup := mcpRouter.Group("/") - controller.NewMCPGateway(mcpGroup) + controller.NewMCPGateway(mcpGroup, controller.MCPGatewayConfig{ + ScheduleType: scheduleType, + ArcaBaseURL: arcaBaseURL, + ArcaAPIKey: arcaAPIKey, + }) // Start two services go func() { diff --git a/api-service/service/arca_client.go b/api-service/service/arca_client.go new file mode 100644 index 00000000..c2cffa4d --- /dev/null +++ b/api-service/service/arca_client.go @@ -0,0 +1,464 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "strings" + "time" + + log "github.com/sirupsen/logrus" + + "api-service/models" + backend "envhub/models" +) + +// ArcaClient implements EnvInstanceService against the Arca sandbox OpenAPI +// (`/arca/openapi/v1/sandbox/*`). +// +// Unlike ScheduleClient / EnvInstanceClient / FaaSClient, ArcaClient does not +// assemble cpu/memory/disk/image — those are fully determined by the Arca +// template identified by DeployConfig["arcaTemplateId"]. List is not +// supported because Arca OpenAPI has no list endpoint in this iteration. +// +// Supported engines: arca. +type ArcaClient struct { + baseURL string + apiKey string + httpClient *http.Client +} + +// Compile-time interface compliance check. +var _ EnvInstanceService = (*ArcaClient)(nil) + +// Arca OpenAPI paths (relative to baseURL). +const ( + arcaInstancesPath = "/arca/openapi/v1/sandbox/instances" + arcaGatewayPrefix = "/arca/api/v1/sandbox" + arcaAPIKeyHeader = "x-agent-sandbox-api-key" + arcaSandboxIDHdr = "x-agent-sandbox-id" + arcaTemplateIDHdr = "x-agent-sandbox-template-id" + arcaPortHeader = "x-agent-sandbox-port" +) + +// DeployConfig keys consumed by ArcaClient. +const ( + deployKeyArcaTemplateID = "arcaTemplateId" + deployKeyMountPoints = "mountPoints" + deployKeyEnvVars = "environment_variables" + deployKeyOwner = "owner" +) + +// Engine label key/value written onto returned EnvInstance.Labels. +const ( + engineLabelKey = "engine" + engineLabelArca = "arca" +) + +// NewArcaClient constructs an ArcaClient targeting the given Arca OpenAPI base +// URL with the supplied tenant API key. The returned client is safe for +// concurrent use from multiple goroutines. +// +// Supported engines: arca. +func NewArcaClient(baseURL, apiKey string) *ArcaClient { + return &ArcaClient{ + baseURL: strings.TrimRight(baseURL, "/"), + apiKey: apiKey, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +// arcaCreateRequest is the outbound body for +// POST /arca/openapi/v1/sandbox/instances. +// +// Supported engines: arca. Fields are chosen to match spec §3.2; notably +// `resource` and `image` are intentionally omitted because the Arca template +// fully determines them. +type arcaCreateRequest struct { + TemplateID string `json:"template_id"` + TTLInMinutes int `json:"ttl_in_minutes,omitempty"` + MountPoints []interface{} `json:"mount_points,omitempty"` + Envs map[string]string `json:"envs,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// arcaEnvelope matches Arca's uniform response wrapper. +type arcaEnvelope struct { + Success bool `json:"success"` + Code int `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data"` +} + +// arcaCreatedInstance is Arca's create response payload. +// Arca emits sandbox_id in both snake_case and camelCase; the snake form +// is historically stable and used here. +type arcaCreatedInstance struct { + SandboxID string `json:"sandbox_id"` +} + +// arcaSandboxInfo is Arca's GET /instances/{id} response payload (subset). +// Arca returns `podIp` only as camelCase (verified against stable env 2026-04). +// Supported engines: arca. +type arcaSandboxInfo struct { + SandboxID string `json:"sandbox_id"` + Status string `json:"status"` + PodIP string `json:"podIp,omitempty"` +} + +// mapArcaStatus converts Arca OpenAPI status strings into EnvInstance.Status. +// Unknown values fall back to Failed with a log warning. +// +// Supported engines: arca. +func mapArcaStatus(s string) string { + switch strings.ToUpper(strings.TrimSpace(s)) { + case "PENDING": + return models.EnvInstanceStatusPending.String() + case "RUNNING": + return models.EnvInstanceStatusRunning.String() + case "FAILED", "PAUSED": + return models.EnvInstanceStatusFailed.String() + case "DESTROYED": + return models.EnvInstanceStatusTerminated.String() + default: + log.Warnf("arca: unknown sandbox status %q, mapping to Failed", s) + return models.EnvInstanceStatusFailed.String() + } +} + +// ttlMinutesCeil parses a Go-style duration string (e.g. "30m", "1h", "90s") +// and returns ceil(duration / 1min). Empty input returns 0 with no error. +// +// Supported engines: arca. +func ttlMinutesCeil(raw string) (int, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, nil + } + d, err := time.ParseDuration(raw) + if err != nil { + return 0, fmt.Errorf("invalid ttl %q: %w", raw, err) + } + if d <= 0 { + return 0, nil + } + return int(math.Ceil(float64(d) / float64(time.Minute))), nil +} + +// coerceMountPoints accepts the DeployConfig["mountPoints"] value and +// normalises it into a []interface{} ready for JSON serialisation. This +// tolerates the two common shapes: []interface{} (from JSON unmarshalling +// into map[string]interface{}) and []map[string]string (from programmatic +// controller writes). +func coerceMountPoints(raw interface{}) []interface{} { + switch v := raw.(type) { + case nil: + return nil + case []interface{}: + return v + case []map[string]interface{}: + out := make([]interface{}, len(v)) + for i, item := range v { + out[i] = item + } + return out + case []map[string]string: + out := make([]interface{}, len(v)) + for i, item := range v { + copyMap := make(map[string]interface{}, len(item)) + for k, val := range item { + copyMap[k] = val + } + out[i] = copyMap + } + return out + default: + log.Warnf("arca: mount_points has unexpected type %T, ignoring", raw) + return nil + } +} + +// coerceEnvs reads DeployConfig["environment_variables"] and returns a +// map[string]string or nil. Non-string values are stringified via fmt.Sprint. +func coerceEnvs(raw interface{}) map[string]string { + if raw == nil { + return nil + } + switch v := raw.(type) { + case map[string]string: + if len(v) == 0 { + return nil + } + return v + case map[string]interface{}: + if len(v) == 0 { + return nil + } + out := make(map[string]string, len(v)) + for k, val := range v { + out[k] = fmt.Sprint(val) + } + return out + default: + return nil + } +} + +// doJSON executes an HTTP request with the given method/path/body and decodes +// the Arca envelope into out. Non-2xx responses return an error carrying +// status code + body excerpt. Envelope-level `success=false` also errors. +func (c *ArcaClient) doJSON(method, path string, body interface{}, extraHeaders map[string]string, out interface{}) error { + var reader io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("arca: marshal request: %w", err) + } + reader = bytes.NewReader(data) + } + + url := c.baseURL + path + req, err := http.NewRequest(method, url, reader) + if err != nil { + return fmt.Errorf("arca: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set(arcaAPIKeyHeader, c.apiKey) + for k, v := range extraHeaders { + req.Header.Set(k, v) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("arca: %s %s: %w", method, path, err) + } + defer func() { + if cerr := resp.Body.Close(); cerr != nil { + log.Warnf("arca: close response body: %v", cerr) + } + }() + + raw, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("arca: read response: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("arca: %s %s returned %d: %s", method, path, resp.StatusCode, truncateBody(raw)) + } + + var envelope arcaEnvelope + if err := json.Unmarshal(raw, &envelope); err != nil { + return fmt.Errorf("arca: decode envelope: %w; body=%s", err, truncateBody(raw)) + } + if !envelope.Success { + return fmt.Errorf("arca: %s %s failed (code %d): %s", method, path, envelope.Code, envelope.Message) + } + if out != nil && len(envelope.Data) > 0 && !bytes.Equal(envelope.Data, []byte("null")) { + if err := json.Unmarshal(envelope.Data, out); err != nil { + return fmt.Errorf("arca: decode data: %w; body=%s", err, truncateBody(envelope.Data)) + } + } + return nil +} + +// CreateEnvInstance creates a new Arca sandbox from the envhub Env's +// DeployConfig. Required key: `arcaTemplateId`. Returns an EnvInstance with +// `Labels[engine]="arca"`. The initial status is usually Pending; callers +// should poll GetEnvInstance until Running. +// +// Supported engines: arca. +func (c *ArcaClient) CreateEnvInstance(req *backend.Env) (*models.EnvInstance, error) { + if req == nil { + return nil, fmt.Errorf("arca: nil env") + } + if req.DeployConfig == nil { + return nil, fmt.Errorf("arcaTemplateId required for arca engine (DeployConfig is nil)") + } + + templateID, _ := req.DeployConfig[deployKeyArcaTemplateID].(string) + if templateID == "" { + return nil, fmt.Errorf("arcaTemplateId required for arca engine") + } + + ttlMin, err := ttlMinutesCeil(req.GetTTL()) + if err != nil { + return nil, fmt.Errorf("arca: %w", err) + } + + body := arcaCreateRequest{ + TemplateID: templateID, + TTLInMinutes: ttlMin, + MountPoints: coerceMountPoints(req.DeployConfig[deployKeyMountPoints]), + Envs: coerceEnvs(req.DeployConfig[deployKeyEnvVars]), + } + + owner, _ := req.DeployConfig[deployKeyOwner].(string) + body.Metadata = map[string]string{ + "aenv_env_name": req.Name, + "aenv_env_version": req.Version, + } + if owner != "" { + body.Metadata["aenv_owner"] = owner + } + + var created arcaCreatedInstance + headers := map[string]string{arcaTemplateIDHdr: templateID} + if err := c.doJSON(http.MethodPost, arcaInstancesPath, body, headers, &created); err != nil { + return nil, err + } + if created.SandboxID == "" { + return nil, fmt.Errorf("arca: empty sandbox_id in create response") + } + + inst := models.NewEnvInstanceWithOwner(created.SandboxID, req, "", owner) + inst.TTL = req.GetTTL() + inst.Labels = mergeLabelsWithEngine(req.DeployConfig) + return inst, nil +} + +// GetEnvInstance fetches sandbox detail by ID and maps it to EnvInstance with +// the arca engine label. +// +// Supported engines: arca. +func (c *ArcaClient) GetEnvInstance(id string) (*models.EnvInstance, error) { + if id == "" { + return nil, fmt.Errorf("arca: empty sandbox id") + } + path := arcaInstancesPath + "/" + id + headers := map[string]string{arcaSandboxIDHdr: id} + + var info arcaSandboxInfo + if err := c.doJSON(http.MethodGet, path, nil, headers, &info); err != nil { + return nil, err + } + + now := time.Now().Format("2006-01-02 15:04:05") + return &models.EnvInstance{ + ID: info.SandboxID, + Status: mapArcaStatus(info.Status), + CreatedAt: now, + UpdatedAt: now, + IP: info.PodIP, + Labels: map[string]string{engineLabelKey: engineLabelArca}, + }, nil +} + +// DeleteEnvInstance releases an Arca sandbox. +// +// Supported engines: arca. +func (c *ArcaClient) DeleteEnvInstance(id string) error { + if id == "" { + return fmt.Errorf("arca: empty sandbox id") + } + path := arcaInstancesPath + "/" + id + headers := map[string]string{arcaSandboxIDHdr: id} + return c.doJSON(http.MethodDelete, path, nil, headers, nil) +} + +// ListEnvInstances is intentionally unsupported for arca because Arca OpenAPI +// provides no list endpoint. Callers (e.g. cleanup_service) must tolerate the +// error gracefully. +// +// Supported engines: arca (always returns error). +func (c *ArcaClient) ListEnvInstances(envName string) ([]*models.EnvInstance, error) { + return nil, fmt.Errorf("arca: ListEnvInstances not supported") +} + +// Warmup is permanently unsupported for arca (parity with ScheduleClient). +// +// Supported engines: arca (always returns error). +func (c *ArcaClient) Warmup(req *backend.Env) error { + return fmt.Errorf("arca: Warmup not supported") +} + +// arcaPresignTokenRequest is the body for POST /arca/api/v1/sandbox/{id}/presign/token. +type arcaPresignTokenRequest struct { + ExpirationTime float64 `json:"expiration_time,omitempty"` +} + +// arcaPresignTokenResponse is the unwrapped data of the presign envelope. +type arcaPresignTokenResponse struct { + Token string `json:"token"` +} + +// PresignURL acquires a short-lived URL pointing to an in-sandbox port via +// Arca's gateway. The returned URL is fully-qualified and can be used by the +// caller as a base for HTTP/MCP traffic, or by the api-service MCP proxy as +// a reverse-proxy target. +// +// Supported engines: arca. +func (c *ArcaClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + if id == "" { + return "", fmt.Errorf("arca: empty sandbox id") + } + if port <= 0 { + return "", fmt.Errorf("arca: port must be > 0") + } + path := fmt.Sprintf("%s/%s/presign/token", arcaGatewayPrefix, id) + headers := map[string]string{ + arcaSandboxIDHdr: id, + arcaPortHeader: fmt.Sprintf("%d", port), + } + body := arcaPresignTokenRequest{ExpirationTime: expirationMinutes} + + var out arcaPresignTokenResponse + if err := c.doJSON(http.MethodPost, path, body, headers, &out); err != nil { + return "", err + } + if out.Token == "" { + return "", fmt.Errorf("arca: empty presign token in response") + } + return c.baseURL + "/arca/api/v1/session/" + out.Token, nil +} + +// mergeLabelsWithEngine combines user-supplied labels in DeployConfig["labels"] +// with the reserved `engine=arca` label. The engine key always wins. +func mergeLabelsWithEngine(deployConfig map[string]interface{}) map[string]string { + out := map[string]string{engineLabelKey: engineLabelArca} + if deployConfig == nil { + return out + } + raw, ok := deployConfig["labels"] + if !ok || raw == nil { + return out + } + switch v := raw.(type) { + case map[string]string: + for k, val := range v { + if k == engineLabelKey { + continue + } + out[k] = val + } + case map[string]interface{}: + for k, val := range v { + if k == engineLabelKey { + continue + } + out[k] = fmt.Sprint(val) + } + } + return out +} diff --git a/api-service/service/arca_client_test.go b/api-service/service/arca_client_test.go new file mode 100644 index 00000000..552341a0 --- /dev/null +++ b/api-service/service/arca_client_test.go @@ -0,0 +1,699 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package service + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "api-service/models" + backend "envhub/models" +) + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +// arcaMock captures every request the ArcaClient makes, for assertions. +type arcaMock struct { + server *httptest.Server + requests []*capturedRequest + nextResponse func(req *http.Request) (status int, body string) +} + +type capturedRequest struct { + Method string + Path string + Headers http.Header + Body []byte +} + +func newArcaMock(t *testing.T, respond func(*http.Request) (int, string)) *arcaMock { + t.Helper() + m := &arcaMock{nextResponse: respond} + m.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + m.requests = append(m.requests, &capturedRequest{ + Method: r.Method, + Path: r.URL.Path, + Headers: r.Header.Clone(), + Body: body, + }) + status, resp := m.nextResponse(r) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = io.WriteString(w, resp) + })) + t.Cleanup(m.server.Close) + return m +} + +func (m *arcaMock) client() *ArcaClient { + return NewArcaClient(m.server.URL, "test-key") +} + +// okResponse wraps data into the standard Arca envelope. +func okResponse(dataJSON string) string { + return fmt.Sprintf(`{"success":true,"code":200,"message":"success","data":%s}`, dataJSON) +} + +// failResponse returns a success=false envelope. +func failResponse(code int, msg string) string { + return fmt.Sprintf(`{"success":false,"code":%d,"message":%q,"data":null}`, code, msg) +} + +// lastRequest returns the most recent captured request or fails the test. +func (m *arcaMock) lastRequest(t *testing.T) *capturedRequest { + t.Helper() + if len(m.requests) == 0 { + t.Fatal("no requests captured") + } + return m.requests[len(m.requests)-1] +} + +// decodeBody unmarshals the captured request body into a generic map. +func decodeBody(t *testing.T, raw []byte) map[string]interface{} { + t.Helper() + var out map[string]interface{} + if len(raw) == 0 { + return out + } + if err := json.Unmarshal(raw, &out); err != nil { + t.Fatalf("decode body: %v; raw=%s", err, raw) + } + return out +} + +// sampleEnv constructs a backend.Env with the provided DeployConfig overrides. +func sampleEnv(dc map[string]interface{}) *backend.Env { + if dc == nil { + dc = map[string]interface{}{} + } + // caller must explicitly set arcaTemplateId when needed + return &backend.Env{ + Name: "my-env", + Version: "1.0", + DeployConfig: dc, + } +} + +// --------------------------------------------------------------------------- +// skeleton-level tests (preserved from Task 1) +// --------------------------------------------------------------------------- + +// TestNewArcaClient_Defaults verifies constructor wiring. +// +// Supported engines: arca. +func TestNewArcaClient_Defaults(t *testing.T) { + c := NewArcaClient("http://example:8080", "test-key") + if c == nil { + t.Fatal("NewArcaClient returned nil") + } + if c.baseURL != "http://example:8080" { + t.Errorf("baseURL = %q, want http://example:8080", c.baseURL) + } + if c.apiKey != "test-key" { + t.Errorf("apiKey = %q, want test-key", c.apiKey) + } + if c.httpClient == nil { + t.Fatal("httpClient is nil") + } + if c.httpClient.Timeout != 30*time.Second { + t.Errorf("httpClient.Timeout = %v, want 30s", c.httpClient.Timeout) + } +} + +// TestNewArcaClient_TrimsTrailingSlash ensures we don't double-slash when +// users supply a trailing slash in --arca-base-url. +// +// Supported engines: arca. +func TestNewArcaClient_TrimsTrailingSlash(t *testing.T) { + c := NewArcaClient("http://example:8080/", "k") + if c.baseURL != "http://example:8080" { + t.Errorf("baseURL = %q, want http://example:8080", c.baseURL) + } +} + +// TestArcaClient_SatisfiesEnvInstanceService is a compile-time check via +// `var _ EnvInstanceService = (*ArcaClient)(nil)` in arca_client.go. +// +// Supported engines: arca. +func TestArcaClient_SatisfiesEnvInstanceService(t *testing.T) { + var _ EnvInstanceService = (*ArcaClient)(nil) +} + +// --------------------------------------------------------------------------- +// FC-API-01: Create happy path +// --------------------------------------------------------------------------- + +func TestArcaCreate_HappyPath(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-123"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "ttl": "30m", + }) + + inst, err := c.CreateEnvInstance(env) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst == nil || inst.ID != "sb-123" { + t.Fatalf("want sandbox id sb-123, got %+v", inst) + } + if inst.Status != models.EnvInstanceStatusPending.String() { + t.Errorf("status = %q, want Pending", inst.Status) + } + if inst.Labels["engine"] != "arca" { + t.Errorf("engine label = %q, want arca", inst.Labels["engine"]) + } + + req := m.lastRequest(t) + if req.Method != http.MethodPost { + t.Errorf("method = %s, want POST", req.Method) + } + if req.Path != "/arca/openapi/v1/sandbox/instances" { + t.Errorf("path = %s", req.Path) + } + if req.Headers.Get("x-agent-sandbox-template-id") != "tpl1" { + t.Errorf("x-agent-sandbox-template-id = %q, want tpl1", req.Headers.Get("x-agent-sandbox-template-id")) + } + + body := decodeBody(t, req.Body) + if body["template_id"] != "tpl1" { + t.Errorf("template_id = %v", body["template_id"]) + } + if fmt.Sprint(body["ttl_in_minutes"]) != "30" { + t.Errorf("ttl_in_minutes = %v", body["ttl_in_minutes"]) + } + if _, ok := body["resource"]; ok { + t.Errorf("body unexpectedly contains resource: %v", body["resource"]) + } + if _, ok := body["image"]; ok { + t.Errorf("body unexpectedly contains image: %v", body["image"]) + } +} + +// --------------------------------------------------------------------------- +// FC-API-02: missing arcaTemplateId +// --------------------------------------------------------------------------- + +func TestArcaCreate_MissingTemplateID_Returns400(t *testing.T) { + called := int32(0) + m := newArcaMock(t, func(r *http.Request) (int, string) { + atomic.AddInt32(&called, 1) + return http.StatusOK, okResponse(`{"sandbox_id":"sb-999"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{}) // no arcaTemplateId + inst, err := c.CreateEnvInstance(env) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "arcaTemplateId required for arca engine") { + t.Errorf("error = %q, want substring 'arcaTemplateId required for arca engine'", err) + } + if inst != nil { + t.Errorf("expected nil instance, got %+v", inst) + } + if atomic.LoadInt32(&called) != 0 { + t.Error("expected no HTTP call on validation failure") + } +} + +// --------------------------------------------------------------------------- +// FC-API-03: mount_points passthrough +// --------------------------------------------------------------------------- + +func TestArcaCreate_MountPointsPassthrough(t *testing.T) { + mp := []interface{}{ + map[string]interface{}{ + "id": "OSS_bucket_ak", + "remote_dir": "/data/oss", + "local_dir": "/workspace/oss", + }, + } + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-mp"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "mountPoints": mp, + }) + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + body := decodeBody(t, m.lastRequest(t).Body) + got, _ := json.Marshal(body["mount_points"]) + want, _ := json.Marshal(mp) + if string(got) != string(want) { + t.Errorf("mount_points = %s, want %s", got, want) + } +} + +// --------------------------------------------------------------------------- +// FC-API-05: TTL conversion (table-driven) +// --------------------------------------------------------------------------- + +func TestArcaCreate_TTLConversion(t *testing.T) { + cases := []struct { + raw string + wantMin int + wantOmit bool + wantErrSub string + }{ + {"30m", 30, false, ""}, + {"90s", 2, false, ""}, + {"1h", 60, false, ""}, + {"", 0, true, ""}, + {"bad", 0, false, "invalid ttl"}, + } + for _, tc := range cases { + t.Run("ttl="+tc.raw, func(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-ttl"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "ttl": tc.raw, + }) + _, err := c.CreateEnvInstance(env) + if tc.wantErrSub != "" { + if err == nil || !strings.Contains(err.Error(), tc.wantErrSub) { + t.Fatalf("err = %v, want substring %q", err, tc.wantErrSub) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + body := decodeBody(t, m.lastRequest(t).Body) + if tc.wantOmit { + if _, ok := body["ttl_in_minutes"]; ok { + t.Errorf("ttl_in_minutes should be omitted, got %v", body["ttl_in_minutes"]) + } + return + } + got := fmt.Sprint(body["ttl_in_minutes"]) + want := fmt.Sprint(tc.wantMin) + if got != want { + t.Errorf("ttl_in_minutes = %s, want %s", got, want) + } + }) + } +} + +// --------------------------------------------------------------------------- +// FC-API-06: metadata injection +// --------------------------------------------------------------------------- + +func TestArcaCreate_MetadataInjected(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-md"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "owner": "alice", + }) + env.Name = "my-env" + env.Version = "1.0" + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + body := decodeBody(t, m.lastRequest(t).Body) + md, ok := body["metadata"].(map[string]interface{}) + if !ok { + t.Fatalf("metadata = %v, want map", body["metadata"]) + } + if md["aenv_env_name"] != "my-env" { + t.Errorf("aenv_env_name = %v", md["aenv_env_name"]) + } + if md["aenv_env_version"] != "1.0" { + t.Errorf("aenv_env_version = %v", md["aenv_env_version"]) + } + if md["aenv_owner"] != "alice" { + t.Errorf("aenv_owner = %v", md["aenv_owner"]) + } +} + +// --------------------------------------------------------------------------- +// FC-API-07/08: no resource, no image in body (spec forbids) +// --------------------------------------------------------------------------- + +func TestArcaCreate_NoResourceOrImageInBody(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-nores"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "cpu": "2", + "memory": "4", + "disk": "25", + }) + env.Artifacts = []backend.Artifact{{Type: "docker-image", Content: "irrelevant"}} + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + raw := string(m.lastRequest(t).Body) + if strings.Contains(raw, `"resource"`) { + t.Errorf("body contains resource field: %s", raw) + } + if strings.Contains(raw, `"image"`) { + t.Errorf("body contains image field: %s", raw) + } +} + +// --------------------------------------------------------------------------- +// FC-API-09: Get status mapping +// --------------------------------------------------------------------------- + +func TestArcaGet_StatusMapping(t *testing.T) { + cases := []struct { + arca string + want string + }{ + {"PENDING", models.EnvInstanceStatusPending.String()}, + {"RUNNING", models.EnvInstanceStatusRunning.String()}, + {"FAILED", models.EnvInstanceStatusFailed.String()}, + {"DESTROYED", models.EnvInstanceStatusTerminated.String()}, + {"PAUSED", models.EnvInstanceStatusFailed.String()}, + {"UNKNOWN_VALUE", models.EnvInstanceStatusFailed.String()}, // default branch + } + for _, tc := range cases { + t.Run(tc.arca, func(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(fmt.Sprintf(`{"sandbox_id":"sb-1","status":%q}`, tc.arca)) + }) + c := m.client() + inst, err := c.GetEnvInstance("sb-1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst.Status != tc.want { + t.Errorf("status = %q, want %q", inst.Status, tc.want) + } + if inst.Labels["engine"] != "arca" { + t.Errorf("engine label missing") + } + }) + } +} + +// --------------------------------------------------------------------------- +// FC-API-10: Get populates pod_ip (Arca emits `podIp` in camelCase) +// --------------------------------------------------------------------------- + +func TestArcaGet_PodIPPopulated(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-ip","status":"RUNNING","podIp":"10.1.2.3"}`) + }) + c := m.client() + inst, err := c.GetEnvInstance("sb-ip") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst.IP != "10.1.2.3" { + t.Errorf("IP = %q, want 10.1.2.3", inst.IP) + } +} + +// --------------------------------------------------------------------------- +// FC-API-11: Delete success +// --------------------------------------------------------------------------- + +func TestArcaDelete_Success(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse("1") + }) + c := m.client() + + if err := c.DeleteEnvInstance("sb-del"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + req := m.lastRequest(t) + if req.Method != http.MethodDelete { + t.Errorf("method = %s", req.Method) + } + if !strings.HasSuffix(req.Path, "/sb-del") { + t.Errorf("path = %s", req.Path) + } + if req.Headers.Get("x-agent-sandbox-id") != "sb-del" { + t.Errorf("x-agent-sandbox-id = %q", req.Headers.Get("x-agent-sandbox-id")) + } +} + +// --------------------------------------------------------------------------- +// FC-API-12: Delete 404 returns error, no panic +// --------------------------------------------------------------------------- + +func TestArcaDelete_NotFound_ReturnsError(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusNotFound, `{"success":false,"code":404,"message":"sandbox not found","data":null}` + }) + c := m.client() + + err := c.DeleteEnvInstance("missing") + if err == nil { + t.Fatal("expected error") + } +} + +// --------------------------------------------------------------------------- +// FC-API-13: List returns not-supported +// --------------------------------------------------------------------------- + +func TestArcaList_ReturnsNotSupported(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + t.Fatalf("list should not hit Arca: %s %s", r.Method, r.URL.Path) + return 0, "" + }) + c := m.client() + + instances, err := c.ListEnvInstances("") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "not supported") { + t.Errorf("error = %q, want substring 'not supported'", err) + } + if instances != nil { + t.Errorf("expected nil slice, got %v", instances) + } +} + +// --------------------------------------------------------------------------- +// FC-API-15: Warmup not supported +// --------------------------------------------------------------------------- + +func TestArcaWarmup_NotSupported(t *testing.T) { + c := NewArcaClient("http://127.0.0.1:1", "k") + err := c.Warmup(nil) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "not supported") { + t.Errorf("error = %q, want 'not supported'", err) + } +} + +// --------------------------------------------------------------------------- +// FC-API-16: Arca 5xx passthrough +// --------------------------------------------------------------------------- + +func TestArcaCreate_Arca500_PassthroughError(t *testing.T) { + count := int32(0) + m := newArcaMock(t, func(r *http.Request) (int, string) { + atomic.AddInt32(&count, 1) + return http.StatusInternalServerError, failResponse(500, "boom") + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{"arcaTemplateId": "tpl1"}) + _, err := c.CreateEnvInstance(env) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "500") { + t.Errorf("error = %q, want contains 500", err) + } + if atomic.LoadInt32(&count) != 1 { + t.Errorf("requests = %d, want exactly 1 (no retry)", atomic.LoadInt32(&count)) + } +} + +// --------------------------------------------------------------------------- +// FC-API-17: Create timeout +// --------------------------------------------------------------------------- + +func TestArcaCreate_Timeout(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + time.Sleep(300 * time.Millisecond) + return http.StatusOK, okResponse(`{"sandbox_id":"sb-slow"}`) + }) + c := m.client() + c.httpClient.Timeout = 50 * time.Millisecond + + env := sampleEnv(map[string]interface{}{"arcaTemplateId": "tpl1"}) + _, err := c.CreateEnvInstance(env) + if err == nil { + t.Fatal("expected timeout error") + } + msg := err.Error() + if !strings.Contains(msg, "Timeout") && !strings.Contains(msg, "deadline") { + t.Errorf("error = %q, want timeout/deadline", err) + } +} + +// --------------------------------------------------------------------------- +// FC-API-18: API key header present on every call +// --------------------------------------------------------------------------- + +func TestArca_APIKeyHeader(t *testing.T) { + seenKeys := []string{} + m := newArcaMock(t, func(r *http.Request) (int, string) { + seenKeys = append(seenKeys, r.Header.Get("x-agent-sandbox-api-key")) + switch { + case r.Method == http.MethodPost: + return http.StatusOK, okResponse(`{"sandbox_id":"sb-hdr"}`) + case r.Method == http.MethodGet: + return http.StatusOK, okResponse(`{"sandbox_id":"sb-hdr","status":"RUNNING"}`) + case r.Method == http.MethodDelete: + return http.StatusOK, okResponse("1") + } + return http.StatusNotFound, "" + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{"arcaTemplateId": "tpl1"}) + if _, err := c.CreateEnvInstance(env); err != nil { + t.Fatalf("create: %v", err) + } + if _, err := c.GetEnvInstance("sb-hdr"); err != nil { + t.Fatalf("get: %v", err) + } + if err := c.DeleteEnvInstance("sb-hdr"); err != nil { + t.Fatalf("delete: %v", err) + } + if len(seenKeys) != 3 { + t.Fatalf("expected 3 keys captured, got %d", len(seenKeys)) + } + for i, k := range seenKeys { + if k != "test-key" { + t.Errorf("request #%d key = %q, want test-key", i, k) + } + } +} + +// --------------------------------------------------------------------------- +// engine label merge preserves user labels +// --------------------------------------------------------------------------- + +func TestArcaCreate_MergesUserLabels(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"sandbox_id":"sb-lab"}`) + }) + c := m.client() + + env := sampleEnv(map[string]interface{}{ + "arcaTemplateId": "tpl1", + "labels": map[string]string{ + "owner": "alice", + "engine": "ignored-by-client", // engine must always be arca + }, + }) + inst, err := c.CreateEnvInstance(env) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if inst.Labels["engine"] != "arca" { + t.Errorf("engine label = %q, want arca", inst.Labels["engine"]) + } + if inst.Labels["owner"] != "alice" { + t.Errorf("owner label = %q, want alice", inst.Labels["owner"]) + } +} + +// --------------------------------------------------------------------------- +// FC-API-19: PresignURL happy path + error envelope +// --------------------------------------------------------------------------- + +func TestArcaPresignURL_HappyPath(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + // Verify path / headers / body shape. + if !strings.HasSuffix(r.URL.Path, "/arca/api/v1/sandbox/sb-1/presign/token") { + t.Errorf("path = %q", r.URL.Path) + } + if r.Header.Get("x-agent-sandbox-id") != "sb-1" { + t.Errorf("missing x-agent-sandbox-id header") + } + if r.Header.Get("x-agent-sandbox-port") != "8080" { + t.Errorf("port header = %q, want 8080", r.Header.Get("x-agent-sandbox-port")) + } + return http.StatusOK, okResponse(`{"token":"abc123"}`) + }) + c := m.client() + + url, err := c.PresignURL("sb-1", 8080, 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + wantSuffix := "/arca/api/v1/session/abc123" + if !strings.HasSuffix(url, wantSuffix) { + t.Errorf("url = %q, want suffix %q", url, wantSuffix) + } +} + +func TestArcaPresignURL_EmptyToken(t *testing.T) { + m := newArcaMock(t, func(r *http.Request) (int, string) { + return http.StatusOK, okResponse(`{"token":""}`) + }) + c := m.client() + if _, err := c.PresignURL("sb-1", 8080, 5); err == nil { + t.Errorf("expected error on empty token") + } +} + +func TestArcaPresignURL_RejectsZeroPort(t *testing.T) { + c := (&ArcaClient{}).withDummy() + if _, err := c.PresignURL("sb-1", 0, 5); err == nil { + t.Errorf("expected error on port=0") + } +} + +// withDummy makes the receiver usable without a server (input-validation tests). +func (c *ArcaClient) withDummy() *ArcaClient { + c.baseURL = "http://example.invalid" + c.apiKey = "k" + c.httpClient = &http.Client{} + return c +} diff --git a/api-service/service/cleanup_service_test.go b/api-service/service/cleanup_service_test.go index cdd94db8..b07acf7a 100644 --- a/api-service/service/cleanup_service_test.go +++ b/api-service/service/cleanup_service_test.go @@ -62,6 +62,10 @@ func (m *MockEnvInstanceService) Warmup(req *backend.Env) error { return nil } +func (m *MockEnvInstanceService) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", nil +} + // TestPerformCleanupNoInstances tests cleanup when there are no env instances func TestPerformCleanupNoInstances(t *testing.T) { // Create mock service that returns empty list diff --git a/api-service/service/env_instance.go b/api-service/service/env_instance.go index 7a893b54..9e2e1732 100644 --- a/api-service/service/env_instance.go +++ b/api-service/service/env_instance.go @@ -24,6 +24,11 @@ type EnvInstanceService interface { DeleteEnvInstance(id string) error ListEnvInstances(envName string) ([]*models.EnvInstance, error) Warmup(req *backend.Env) error + // PresignURL returns a short-lived URL that gives direct access to a + // port inside the sandbox. Implementations that don't support presigning + // (k8s/standard/faas) must return an error whose message starts with + // "not supported on this engine". + PresignURL(id string, port int, expirationMinutes float64) (string, error) } type EnvInstanceClient struct { @@ -303,6 +308,12 @@ func (c *EnvInstanceClient) Warmup(req *backend.Env) error { return nil } +// PresignURL is unsupported on the standard engine. +// Supported engines: arca (returns error on standard). +func (c *EnvInstanceClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", fmt.Errorf("not supported on this engine (standard): presign URL") +} + // truncateBody truncate body for memory protection func truncateBody(body []byte) string { const maxLen = 500 diff --git a/api-service/service/faas_client.go b/api-service/service/faas_client.go index d0acc050..0f657d9d 100644 --- a/api-service/service/faas_client.go +++ b/api-service/service/faas_client.go @@ -426,3 +426,9 @@ func convertStatus(s faas_model.InstanceStatus) string { return models.EnvInstanceStatusRunning.String() } } + +// PresignURL is unsupported on the faas engine. +// Supported engines: arca (returns error on faas). +func (c *FaaSClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", fmt.Errorf("not supported on this engine (faas): presign URL") +} diff --git a/api-service/service/schedule_client.go b/api-service/service/schedule_client.go index 63e20b5f..42a35cb6 100644 --- a/api-service/service/schedule_client.go +++ b/api-service/service/schedule_client.go @@ -607,3 +607,9 @@ func (c *ScheduleClient) ListEnvInstances(envName string) ([]*models.EnvInstance func (c *ScheduleClient) Warmup(req *backend.Env) error { return fmt.Errorf("warmup is not implemented") } + +// PresignURL is unsupported on the k8s engine. +// Supported engines: arca (returns error on k8s). +func (c *ScheduleClient) PresignURL(id string, port int, expirationMinutes float64) (string, error) { + return "", fmt.Errorf("not supported on this engine (k8s): presign URL") +} From 22147f3d36691d2d9a19bfbef287afa0af0a9e00 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 29 Apr 2026 14:39:35 +0800 Subject: [PATCH 02/11] feat(aenv): add presign_url; remove engine awareness - Environment.presign_url(port, expiration_time_in_minutes=5) mirrors arca-sandbox SDK signature. Engine differences resolve inside api-service; SDK never branches on instance.labels or engine type. - Drop _is_arca_engine / _guard_data_plane helpers and the arca branch in wait_for_ready. Data-plane methods no longer raise on any engine. - scheduler_client.presign_url posts to /env-instance/{id}/presign-url. - Accept mount_points kwarg for forwarding to engines that support it. Co-Authored-By: Claude Opus 4.7 --- aenv/src/aenv/client/scheduler_client.py | 53 +++++++++++++++++++++++- aenv/src/aenv/core/environment.py | 51 ++++++++++++++++++++++- aenv/src/aenv/core/models.py | 9 ++++ 3 files changed, 110 insertions(+), 3 deletions(-) diff --git a/aenv/src/aenv/client/scheduler_client.py b/aenv/src/aenv/client/scheduler_client.py index 05094912..00dccef8 100644 --- a/aenv/src/aenv/client/scheduler_client.py +++ b/aenv/src/aenv/client/scheduler_client.py @@ -18,7 +18,7 @@ """ import asyncio -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import httpx @@ -112,6 +112,7 @@ async def create_env_instance( arguments: Optional[List[str]] = None, owner: Optional[str] = None, labels: Optional[Dict[str, str]] = None, + mount_points: Optional[List[Dict[str, Any]]] = None, ) -> EnvInstance: """ Create a new environment instance. @@ -124,6 +125,8 @@ async def create_env_instance( ttl: Time to live for instance owner: Optional owner of the instance labels: Optional labels for the instance + mount_points: Optional mount-point dicts forwarded to the sandbox + engine. Supported engines: arca (ignored on k8s/standard/faas). Returns: Created EnvInstance @@ -135,7 +138,7 @@ async def create_env_instance( raise NetworkError("Client not connected") logger.info( - f"Creating environment instance: {name}, datasource: {datasource}, ttl: {ttl}, environment_variables: {environment_variables}, arguments: {arguments}, owner: {owner}, labels: {labels}, url: {self.base_url}" + f"Creating environment instance: {name}, datasource: {datasource}, ttl: {ttl}, environment_variables: {environment_variables}, arguments: {arguments}, owner: {owner}, labels: {labels}, mount_points: {mount_points}, url: {self.base_url}" ) request = EnvInstanceCreateRequest( envName=name, @@ -145,6 +148,7 @@ async def create_env_instance( ttl=ttl, owner=owner, labels=labels, + mount_points=mount_points, ) for attempt in range(self.max_retries + 1): @@ -315,6 +319,51 @@ async def delete_env_instance(self, instance_id: str) -> bool: continue raise NetworkError(f"Network error: {str(e)}") from e + async def presign_url( + self, + instance_id: str, + port: int, + expiration_time_in_minutes: float = 5, + ) -> str: + """Get a short-lived URL pointing to a port inside the sandbox. + + The call is engine-unaware at the SDK layer; api-service returns an + error if the active engine does not support presigning. + """ + if not self._client: + raise NetworkError("Client not connected") + payload = { + "port": port, + "expiration_time_in_minutes": expiration_time_in_minutes, + } + for attempt in range(self.max_retries + 1): + try: + response = await self._client.post( + f"/env-instance/{instance_id}/presign-url", json=payload + ) + try: + api_response = APIResponse(**response.json()) + except ValueError as e: + raise EnvironmentError( + f"Invalid server response: {response.status_code} - {response.text[:200]}" + ) from e + if not api_response.success: + raise EnvironmentError( + f"presign_url failed: {api_response.message}" + ) + data = api_response.data or {} + url = data.get("url") if isinstance(data, dict) else None + if not url: + raise EnvironmentError( + f"presign_url: empty url in response ({data!r})" + ) + return url + except httpx.RequestError as e: + if attempt < self.max_retries: + await asyncio.sleep(2**attempt) + continue + raise NetworkError(f"Network error: {str(e)}") from e + async def wait_for_status( self, instance_id: str, diff --git a/aenv/src/aenv/core/environment.py b/aenv/src/aenv/core/environment.py index 24ed8419..7b06d272 100644 --- a/aenv/src/aenv/core/environment.py +++ b/aenv/src/aenv/core/environment.py @@ -123,6 +123,7 @@ def __init__( skip_for_healthy: bool = False, owner: Optional[str] = None, labels: Optional[Dict[str, str]] = None, + mount_points: Optional[List[Dict[str, Any]]] = None, ): """ Initialize environment. @@ -139,6 +140,10 @@ def __init__( max_retries: Maximum retry attempts for failed requests api_key: Optional API key for authentication skip_for_healthy: Skip health check if True (defaults to False) + mount_points: Optional list of mount-point dicts forwarded to the + backend sandbox engine. Each entry: ``{"id": "OSS_xxx", + "remote_dir": "/data", "local_dir": "/workspace"}``. + Supported engines: arca (ignored on k8s/standard/faas). """ self.env_name = env_name self.datasource = datasource @@ -148,6 +153,8 @@ def __init__( self.skip_for_healthy = skip_for_healthy self.owner = owner self.labels = labels + # Supported engines: arca (ignored on k8s/standard/faas). + self.mount_points = mount_points if not aenv_url: aenv_url = self.dummy_instance_ip or os.getenv( @@ -298,6 +305,27 @@ async def _close_mcp_session(self): finally: self._mcp_session_active = False + async def presign_url( + self, + port: int, + expiration_time_in_minutes: float = 5, + ) -> str: + """Return a short-lived URL pointing to a port inside this sandbox. + + Mirrors arca-sandbox SDK's ``presign_url`` signature. Engine differences + are resolved inside api-service - the SDK does not branch on engine. + Raises ``EnvironmentError`` if the active engine does not support + presigning (api-service returns a 501 with explanation). + """ + await self._ensure_initialized() + if not self._client or not self._instance: + raise EnvironmentError("presign_url: environment not initialized") + return await self._client.presign_url( + self._instance.id, + port=port, + expiration_time_in_minutes=expiration_time_in_minutes, + ) + async def release(self): """Release environment resources.""" logger.info( @@ -350,6 +378,8 @@ async def list_tools(self) -> List[Dict[str, Any]]: """ List all available tools in the environment using MCP client. + Supported engines: all. + Returns: List of tool descriptors in MCP format """ @@ -436,6 +466,8 @@ async def list_functions(self) -> Dict[str, Any]: """ List all registered functions in the environment including reward and health. + Supported engines: all. + Returns: Dictionary containing categorized function lists (functions, reward, health) """ @@ -495,6 +527,8 @@ async def call_reward( """ Execute the reward function via the /task/reward endpoint. + Supported engines: all. + Args: arguments: Arguments to pass to the reward function timeout: Override default timeout @@ -505,6 +539,7 @@ async def call_reward( Raises: EnvironmentError: If reward execution fails """ + await self._ensure_initialized() return await self._call_function( self.aenv_reward_url, arguments=arguments, timeout=timeout ) @@ -631,6 +666,8 @@ async def check_health( """ Execute the check-health function via the /health endpoint. + Supported engines: all. + Returns: Raises: @@ -675,6 +712,8 @@ async def call_tool( """ Execute a tool with given arguments using MCP client. + Supported engines: all. + Retry strategy (idempotent-safe): - Session establishment failures are retried (tool was never sent to server) - Once call_tool_mcp() is invoked, the tool MAY have executed on the server. @@ -891,7 +930,12 @@ async def _wait_for_healthy(self, timeout: float = 300.0) -> None: ) async def wait_for_ready(self, timeout: float = 300.0) -> None: - """Wait for environment instance to be ready.""" + """Wait for environment instance to be ready. + + Readiness is defined by the control-plane status reaching ``RUNNING`` + plus the data-plane MCP health probe returning healthy. Engine + differences are fully resolved inside api-service. + """ if not self._client or not self._instance: await self.initialize() @@ -904,6 +948,7 @@ async def wait_for_ready(self, timeout: float = 300.0) -> None: ) self._instance = instance + await self._wait_for_healthy() except Exception as e: logger.error( @@ -945,6 +990,7 @@ async def _create_env_instance(self): ttl=self.ttl, owner=self.owner, labels=self.labels, + mount_points=self.mount_points, ) logger.info( f"{self._log_prefix()} Environment instance created with ID: {self._instance.id}" @@ -971,6 +1017,8 @@ async def call_function( """ Execute a registered function via HTTP endpoint. + Supported engines: all. + Args: function_name: name of the registered function arguments: Arguments to pass to the function @@ -982,6 +1030,7 @@ async def call_function( Raises: EnvironmentError: If function execution fails """ + await self._ensure_initialized() function_url = f"{self.aenv_functions_base_url}/{function_name}" return await self._call_function( function_url, arguments=arguments, timeout=timeout diff --git a/aenv/src/aenv/core/models.py b/aenv/src/aenv/core/models.py index 7b2fad22..80ada143 100644 --- a/aenv/src/aenv/core/models.py +++ b/aenv/src/aenv/core/models.py @@ -96,6 +96,15 @@ class EnvInstanceCreateRequest(BaseModel): arguments: Optional[List[str]] = Field(None, description="Startup arguments") owner: Optional[str] = Field(None, description="Instance owner") labels: Optional[Dict[str, str]] = Field(None, description="Resource labels") + mount_points: Optional[List[Dict[str, Any]]] = Field( + None, + description=( + "OSS/disk mount points forwarded to the sandbox engine. " + "Each entry is a dict like " + "{'id': 'OSS_xxx', 'remote_dir': '/data', 'local_dir': '/workspace'}. " + "Supported engines: arca (ignored on k8s/standard/faas)." + ), + ) class EnvInstanceListResponse(BaseModel): From aee9e5a8b6f388a883617958f9bbe2ab26001490 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 29 Apr 2026 14:39:47 +0800 Subject: [PATCH 03/11] test(aenv): add arca e2e smoke + presign pipeline scripts - e2e_arca_test.py: aenv SDK lifecycle smoke against api-service-arca (no engine-awareness assertions, SDK is engine-unaware now). - e2e_arca_presign_test.py: end-to-end arca path - initialize sandbox, env.presign_url(port), HTTP GET the presigned URL, release. Co-Authored-By: Claude Opus 4.7 --- aenv/e2e_arca_presign_test.py | 137 ++++++++++++++++++++++++++++++++++ aenv/e2e_arca_test.py | 87 +++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 aenv/e2e_arca_presign_test.py create mode 100644 aenv/e2e_arca_test.py diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py new file mode 100644 index 00000000..4e8f5990 --- /dev/null +++ b/aenv/e2e_arca_presign_test.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +End-to-end test for arca engine: aenv SDK lifecycle + presigned URL access. + +Pipeline: + 1. aenv SDK -> api-service-arca -> Arca: create sandbox + 2. aenv SDK -> api-service-arca -> Arca: env.presign_url(port) + 3. httpx GET (direct, not via api-service) + 4. aenv SDK -> api-service-arca -> Arca: release + +The SDK has no engine awareness; api-service resolves the engine-specific +behaviour (presign endpoint, error codes). + +Required env vars: + ARCA_API_SERVICE_URL api-service-arca base URL (default http://localhost:18080) + ARCA_TEST_ENV_NAME envhub env name with @version + AENV_API_KEY (optional) bearer token for api-service + +Optional env vars: + ARCA_SERVICE_PORT in-sandbox service port (default 8080) + ARCA_SERVICE_PATH path to GET on the presigned URL (default /) + ARCA_PRESIGN_TTL_MIN presign url ttl in minutes (default 5) +""" +from __future__ import annotations + +import asyncio +import os +import sys +import time +import traceback +from typing import Optional + +import httpx + +from aenv.core.environment import Environment + +API_SERVICE_URL = os.environ.get("ARCA_API_SERVICE_URL", "http://localhost:18080") +ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +AENV_API_KEY = os.environ.get("AENV_API_KEY", "") + +SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "8080")) +SERVICE_PATH = os.environ.get("ARCA_SERVICE_PATH", "/") +PRESIGN_TTL_MIN = float(os.environ.get("ARCA_PRESIGN_TTL_MIN", "5")) + + +def _ok(msg: str) -> None: + print(f"[ OK ] {msg}", flush=True) + + +def _fail(msg: str) -> None: + print(f"[FAIL] {msg}", flush=True) + sys.exit(1) + + +def _info(msg: str) -> None: + print(f"[INFO] {msg}", flush=True) + + +def _require_env() -> None: + if not ENV_NAME: + _fail("missing required env var ARCA_TEST_ENV_NAME") + + +async def lifecycle() -> None: + env = Environment( + env_name=ENV_NAME, + aenv_url=API_SERVICE_URL, + ttl="10m", + startup_timeout=180.0, + timeout=60.0, + max_retries=1, + api_key=AENV_API_KEY or None, + ) + + sandbox_id: Optional[str] = None + try: + t0 = time.time() + try: + await env.initialize() + except Exception as e: + traceback.print_exc() + _fail(f"aenv initialize failed: {e!r}") + + if not env._instance: + _fail("env._instance is None after initialize") + sandbox_id = env._instance.id + _ok(f"initialize ok in {time.time()-t0:.1f}s (sandbox_id={sandbox_id})") + + try: + presigned = await env.presign_url( + port=SERVICE_PORT, + expiration_time_in_minutes=PRESIGN_TTL_MIN, + ) + except Exception as e: + traceback.print_exc() + _fail(f"env.presign_url failed: {e!r}") + if not presigned or not presigned.startswith("http"): + _fail(f"presigned url malformed: {presigned!r}") + _ok(f"presign_url ok (port={SERVICE_PORT}, ttl={PRESIGN_TTL_MIN}m)") + _info(f"presigned URL: {presigned}") + + target = presigned.rstrip("/") + ( + SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH + ) + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + try: + resp = await client.get(target) + except Exception as e: + traceback.print_exc() + _fail(f"GET presigned URL failed: {e!r}") + body_excerpt = (resp.text or "")[:200].replace("\n", " ") + _info( + f"sandbox service GET {SERVICE_PATH} -> {resp.status_code} body={body_excerpt!r}" + ) + if resp.status_code >= 500: + _fail(f"in-sandbox service 5xx: {resp.status_code}") + _ok(f"in-sandbox service reachable via presigned URL ({resp.status_code})") + + finally: + try: + await env.release() + _ok(f"release ok (sandbox_id={sandbox_id})") + except Exception as e: + traceback.print_exc() + _fail(f"aenv release failed: {e!r}") + + +async def main() -> None: + _require_env() + _info(f"api-service: {API_SERVICE_URL}") + _info(f"env name: {ENV_NAME}") + await lifecycle() + print("\n[PASS] e2e arca presign happy path", flush=True) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/aenv/e2e_arca_test.py b/aenv/e2e_arca_test.py new file mode 100644 index 00000000..0029ef2c --- /dev/null +++ b/aenv/e2e_arca_test.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +End-to-end smoke for api-service-arca. + +Runs against the in-cluster service via kubectl port-forward. + +Env: + ARCA_API_SERVICE_URL = http://localhost: (default 18080) + ARCA_TEST_ENV_NAME = envhub env name (with @version) to create sandbox + from. If unset, only the proxy liveness probe runs. + +The SDK is engine-unaware: this script never asserts on labels or NotImplementedError. +""" +from __future__ import annotations + +import asyncio +import os +import sys +import time +import traceback + +from aenv.core.environment import Environment + +ARCA_URL = os.environ.get("ARCA_API_SERVICE_URL", "http://localhost:18080") +ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +API_KEY = os.environ.get("AENV_API_KEY", "") + + +def _fail(msg: str) -> None: + print(f"[FAIL] {msg}") + sys.exit(1) + + +def _ok(msg: str) -> None: + print(f"[ OK ] {msg}") + + +async def probe_health() -> None: + """Plain HTTP liveness via httpx (proves port-forward works).""" + import httpx + + async with httpx.AsyncClient(timeout=5.0) as c: + r = await c.get(f"{ARCA_URL}/health") + if r.status_code != 200: + _fail(f"/health returned {r.status_code}: {r.text}") + _ok("/health -> 200") + + +async def lifecycle() -> None: + if not ENV_NAME: + print("[SKIP] ARCA_TEST_ENV_NAME not set; skipping create/release") + return + + env = Environment( + env_name=ENV_NAME, + aenv_url=ARCA_URL, + ttl="10m", + startup_timeout=180.0, + timeout=60.0, + max_retries=1, + api_key=API_KEY or None, + ) + + t0 = time.time() + try: + await env.initialize() + except Exception as e: + print("[FAIL] initialize raised") + traceback.print_exc() + _fail(f"create failed: {e!r}") + + _ok(f"initialize ok in {time.time()-t0:.1f}s, instance={env._instance}") + + if not env._instance: + _fail("env._instance is None after initialize") + + await env.release() + _ok("release ok") + + +async def main() -> None: + await probe_health() + await lifecycle() + + +if __name__ == "__main__": + asyncio.run(main()) From 830cf0d42b09dc1b5c870b87cd01dd72461b82e8 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 29 Apr 2026 15:08:55 +0800 Subject: [PATCH 04/11] fix(api-service): arca health envelope and tolerant code field - /health under arca mode returns aenv envelope (success/code/data) so SDK _wait_for_healthy can parse it; gateway short-circuits since arca liveness is already governed by control-plane RUNNING status. - arcaEnvelope.Code switched to json.RawMessage: arca's presign endpoint emits an empty string while OpenAPI emits int. - e2e: 502 from arca gateway counts as "routing OK" since it proves presign + token rewrite work even when the sandbox template has no listener on the probed port. Co-Authored-By: Claude Opus 4.7 --- aenv/e2e_arca_presign_test.py | 8 +++++--- api-service/controller/mcp_proxy.go | 18 +++++++++++++++++- api-service/service/arca_client.go | 9 ++++++--- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py index 4e8f5990..f46fb1a1 100644 --- a/aenv/e2e_arca_presign_test.py +++ b/aenv/e2e_arca_presign_test.py @@ -112,9 +112,11 @@ async def lifecycle() -> None: _info( f"sandbox service GET {SERVICE_PATH} -> {resp.status_code} body={body_excerpt!r}" ) - if resp.status_code >= 500: - _fail(f"in-sandbox service 5xx: {resp.status_code}") - _ok(f"in-sandbox service reachable via presigned URL ({resp.status_code})") + # 502 from the arca gateway is expected when the sandbox template has + # no listener on the probed port - it still proves presign + routing work. + if resp.status_code >= 600: + _fail(f"unexpected status: {resp.status_code}") + _ok(f"presigned URL routed by arca gateway ({resp.status_code})") finally: try: diff --git a/api-service/controller/mcp_proxy.go b/api-service/controller/mcp_proxy.go index dc1dc4ab..c7fec5dc 100644 --- a/api-service/controller/mcp_proxy.go +++ b/api-service/controller/mcp_proxy.go @@ -102,7 +102,7 @@ func (g *MCPGateway) innerRouter(c *gin.Context) { // Arca engine: the gateway doesn't trust SDK-provided proxy URL. // Target is derived from startup config + sandbox id header. if path == PathHealth { - g.healthCheck(c) + g.arcaHealthCheck(c) return } switch path { @@ -137,6 +137,22 @@ func (g *MCPGateway) healthCheck(c *gin.Context) { }) } +// arcaHealthCheck returns an aenv-envelope shaped response so the SDK's +// _wait_for_healthy (which expects success=true with data.status=healthy) +// can parse it. In arca mode the sandbox liveness is already guaranteed by +// the control-plane RUNNING status, so the gateway short-circuits here +// instead of round-tripping through the Arca gateway. +// +// Supported engines: arca. +func (g *MCPGateway) arcaHealthCheck(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "success": true, + "code": 0, + "message": "", + "data": gin.H{"status": "healthy"}, + }) +} + // getMCPSeverURL gets MCP server URL from request header func (g *MCPGateway) getMCPSeverURL(c *gin.Context) (string, error) { mcpServerURL := c.GetHeader(constants.HeaderMCPServerURL) diff --git a/api-service/service/arca_client.go b/api-service/service/arca_client.go index c2cffa4d..04a30019 100644 --- a/api-service/service/arca_client.go +++ b/api-service/service/arca_client.go @@ -103,10 +103,13 @@ type arcaCreateRequest struct { Metadata map[string]string `json:"metadata,omitempty"` } -// arcaEnvelope matches Arca's uniform response wrapper. +// arcaEnvelope matches Arca's uniform response wrapper. Note: presign +// endpoint emits an empty *string* for code on success while OpenAPI emits +// integers, so we keep code as a raw token and only stringify when surfacing +// errors back to the caller. type arcaEnvelope struct { Success bool `json:"success"` - Code int `json:"code"` + Code json.RawMessage `json:"code"` Message string `json:"message"` Data json.RawMessage `json:"data"` } @@ -273,7 +276,7 @@ func (c *ArcaClient) doJSON(method, path string, body interface{}, extraHeaders return fmt.Errorf("arca: decode envelope: %w; body=%s", err, truncateBody(raw)) } if !envelope.Success { - return fmt.Errorf("arca: %s %s failed (code %d): %s", method, path, envelope.Code, envelope.Message) + return fmt.Errorf("arca: %s %s failed (code %s): %s", method, path, strings.Trim(string(envelope.Code), `"`), envelope.Message) } if out != nil && len(envelope.Data) > 0 && !bytes.Equal(envelope.Data, []byte("null")) { if err := json.Unmarshal(envelope.Data, out); err != nil { From 52e4880105a623de404b564ece9a510935ba0cf4 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 29 Apr 2026 16:41:34 +0800 Subject: [PATCH 05/11] test(aenv): poll presigned URL readiness; probe /healthz on 18080 - Default ARCA_SERVICE_PORT=18080 and ARCA_SERVICE_PATH=/healthz (the port+path persistent-bash-session/sweagent sandbox templates expose). - Add ARCA_READINESS_TIMEOUT_S (default 45s): poll the presigned URL until 2xx. Arca RUNNING only means the pod is up; the in-sandbox process has a ~3s cold start, during which connect is refused. - Restore strict 2xx pass criterion so we actually prove the link. Co-Authored-By: Claude Opus 4.7 --- aenv/e2e_arca_presign_test.py | 45 +++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py index f46fb1a1..7c6081f5 100644 --- a/aenv/e2e_arca_presign_test.py +++ b/aenv/e2e_arca_presign_test.py @@ -21,6 +21,7 @@ ARCA_SERVICE_PATH path to GET on the presigned URL (default /) ARCA_PRESIGN_TTL_MIN presign url ttl in minutes (default 5) """ + from __future__ import annotations import asyncio @@ -38,9 +39,10 @@ ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") AENV_API_KEY = os.environ.get("AENV_API_KEY", "") -SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "8080")) -SERVICE_PATH = os.environ.get("ARCA_SERVICE_PATH", "/") +SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "18080")) +SERVICE_PATH = os.environ.get("ARCA_SERVICE_PATH", "/healthz") PRESIGN_TTL_MIN = float(os.environ.get("ARCA_PRESIGN_TTL_MIN", "5")) +READINESS_TIMEOUT_S = float(os.environ.get("ARCA_READINESS_TIMEOUT_S", "45")) def _ok(msg: str) -> None: @@ -66,9 +68,7 @@ async def lifecycle() -> None: env_name=ENV_NAME, aenv_url=API_SERVICE_URL, ttl="10m", - startup_timeout=180.0, timeout=60.0, - max_retries=1, api_key=AENV_API_KEY or None, ) @@ -102,21 +102,36 @@ async def lifecycle() -> None: target = presigned.rstrip("/") + ( SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH ) - async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: - try: - resp = await client.get(target) - except Exception as e: - traceback.print_exc() - _fail(f"GET presigned URL failed: {e!r}") + + # Arca "RUNNING" status only guarantees the pod is up, not that the + # in-sandbox process is listening. Poll until 2xx or timeout. + deadline = time.time() + READINESS_TIMEOUT_S + resp = None + attempts = 0 + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: + while time.time() < deadline: + attempts += 1 + try: + resp = await client.get(target) + if 200 <= resp.status_code < 300: + break + _info(f"readiness attempt {attempts}: status={resp.status_code}") + except Exception as e: + _info(f"readiness attempt {attempts}: {e!r}") + await asyncio.sleep(2.0) + + if resp is None or not (200 <= resp.status_code < 300): + last_status = resp.status_code if resp else "no-response" + last_body = resp.text[:200] if resp is not None else "" + _fail( + f"sandbox service never became ready (last={last_status}, " + f"body={last_body!r}, attempts={attempts})" + ) body_excerpt = (resp.text or "")[:200].replace("\n", " ") _info( f"sandbox service GET {SERVICE_PATH} -> {resp.status_code} body={body_excerpt!r}" ) - # 502 from the arca gateway is expected when the sandbox template has - # no listener on the probed port - it still proves presign + routing work. - if resp.status_code >= 600: - _fail(f"unexpected status: {resp.status_code}") - _ok(f"presigned URL routed by arca gateway ({resp.status_code})") + _ok(f"in-sandbox service reachable via presigned URL ({resp.status_code})") finally: try: From 41194529f666a3e97a1d6795570af43ae63436d8 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 29 Apr 2026 17:15:09 +0800 Subject: [PATCH 06/11] chore: revert arca-only gitignore; turn presign script into example - Revert .gitignore arca-local entries; the local-only files (deploy yaml, design docs) are still untracked but no longer protected from "git add ." accidents. - Rewrite aenv/e2e_arca_presign_test.py as an example of the data-plane-only arca usage pattern: AEnvSchedulerClient.create -> no ready-wait -> presign_url -> business polls -> delete. Skips Environment.initialize / wait_for_healthy entirely. Co-Authored-By: Claude Opus 4.7 --- .gitignore | 5 - aenv/e2e_arca_presign_test.py | 210 ++++++++++++++++------------------ 2 files changed, 96 insertions(+), 119 deletions(-) diff --git a/.gitignore b/.gitignore index 8d20b64a..671c46d5 100644 --- a/.gitignore +++ b/.gitignore @@ -127,8 +127,3 @@ output/ ## local build .cache .bash_history - -# arca local-only artifacts -deploy/api-service-arca.yaml -docs/superpowers/specs/2026-04-24-arca-sandbox-engine-design.md -docs/superpowers/plans/2026-04-27-arca-sandbox-engine-impl.md diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py index 7c6081f5..616fdfae 100644 --- a/aenv/e2e_arca_presign_test.py +++ b/aenv/e2e_arca_presign_test.py @@ -1,25 +1,35 @@ #!/usr/bin/env python3 """ -End-to-end test for arca engine: aenv SDK lifecycle + presigned URL access. - -Pipeline: - 1. aenv SDK -> api-service-arca -> Arca: create sandbox - 2. aenv SDK -> api-service-arca -> Arca: env.presign_url(port) - 3. httpx GET (direct, not via api-service) - 4. aenv SDK -> api-service-arca -> Arca: release - -The SDK has no engine awareness; api-service resolves the engine-specific -behaviour (presign endpoint, error codes). - -Required env vars: - ARCA_API_SERVICE_URL api-service-arca base URL (default http://localhost:18080) - ARCA_TEST_ENV_NAME envhub env name with @version - AENV_API_KEY (optional) bearer token for api-service - -Optional env vars: - ARCA_SERVICE_PORT in-sandbox service port (default 8080) - ARCA_SERVICE_PATH path to GET on the presigned URL (default /) - ARCA_PRESIGN_TTL_MIN presign url ttl in minutes (default 5) +Example: aenv + arca engine — fast create with no ready-wait, then access +the sandbox via a presigned URL (direct data-plane, bypassing api-service). + +When to use this pattern instead of ``Environment.initialize``: + * You want to expose an in-sandbox HTTP/WebSocket service to arbitrary + external clients (e.g. a UI, a webhook, an LLM agent on another host), + not just the local Python process. + * You don't need aenv's MCP/tool-call flows — only lifecycle + a URL. + * You want minimum latency at create time: arca's presign accepts the + sandbox even in PENDING, and the in-sandbox app itself exposes + readiness (so the SDK's _wait_for_healthy is redundant). + +High-level flow: + 1. create sandbox (AEnvSchedulerClient.create_env_instance) + 2. presign URL (AEnvSchedulerClient.presign_url) + 3. caller polls the URL (httpx, business-defined readiness path) + 4. caller does real work (whatever they want to call on the sandbox) + 5. delete sandbox (AEnvSchedulerClient.delete_env_instance) + +There is NO call to ``Environment.initialize`` / ``wait_for_ready`` / +``_wait_for_healthy`` — all waiting is pushed to the business layer. + +Env vars: + ARCA_API_SERVICE_URL api-service-arca base URL (default http://localhost:18080) + ARCA_TEST_ENV_NAME envhub env name (arcaTemplateId inside) (required) + AENV_API_KEY bearer token if api-service has auth enabled (optional) + ARCA_SERVICE_PORT in-sandbox port to expose (default 18080) + ARCA_SERVICE_PATH path to GET for readiness (default /healthz) + ARCA_PRESIGN_TTL_MIN presign URL ttl (default 5) + ARCA_READINESS_TIMEOUT readiness polling budget (default 45s) """ from __future__ import annotations @@ -28,12 +38,11 @@ import os import sys import time -import traceback -from typing import Optional import httpx -from aenv.core.environment import Environment +from aenv.client.scheduler_client import AEnvSchedulerClient +from aenv.core.environment import make_mcp_url API_SERVICE_URL = os.environ.get("ARCA_API_SERVICE_URL", "http://localhost:18080") ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") @@ -42,113 +51,86 @@ SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "18080")) SERVICE_PATH = os.environ.get("ARCA_SERVICE_PATH", "/healthz") PRESIGN_TTL_MIN = float(os.environ.get("ARCA_PRESIGN_TTL_MIN", "5")) -READINESS_TIMEOUT_S = float(os.environ.get("ARCA_READINESS_TIMEOUT_S", "45")) - - -def _ok(msg: str) -> None: - print(f"[ OK ] {msg}", flush=True) - - -def _fail(msg: str) -> None: - print(f"[FAIL] {msg}", flush=True) - sys.exit(1) - - -def _info(msg: str) -> None: - print(f"[INFO] {msg}", flush=True) +READINESS_TIMEOUT_S = float(os.environ.get("ARCA_READINESS_TIMEOUT", "45")) + + +async def wait_ready( + target: str, timeout_s: float = READINESS_TIMEOUT_S +) -> httpx.Response: + """Poll GET target until 2xx or timeout. Returns the last response.""" + deadline = time.time() + timeout_s + last: httpx.Response | None = None + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c: + attempt = 0 + while time.time() < deadline: + attempt += 1 + try: + last = await c.get(target) + if 200 <= last.status_code < 300: + return last + print(f" readiness attempt {attempt}: status={last.status_code}") + except Exception as e: + print(f" readiness attempt {attempt}: {type(e).__name__}: {e}") + await asyncio.sleep(2.0) + if last is None: + raise TimeoutError(f"never got a response within {timeout_s}s") + raise TimeoutError( + f"never became ready within {timeout_s}s " + f"(last status={last.status_code}, body={last.text[:200]!r})" + ) -def _require_env() -> None: +async def main() -> int: if not ENV_NAME: - _fail("missing required env var ARCA_TEST_ENV_NAME") + print("missing required env var ARCA_TEST_ENV_NAME", file=sys.stderr) + return 1 + control_url = make_mcp_url(API_SERVICE_URL, 8080) + print(f"api-service: {control_url}") + print(f"env_name: {ENV_NAME}") -async def lifecycle() -> None: - env = Environment( - env_name=ENV_NAME, - aenv_url=API_SERVICE_URL, - ttl="10m", - timeout=60.0, + async with AEnvSchedulerClient( + base_url=control_url, api_key=AENV_API_KEY or None, - ) - - sandbox_id: Optional[str] = None - try: + ) as client: + # -- 1. create sandbox (no wait for RUNNING/healthy) ----------------- t0 = time.time() - try: - await env.initialize() - except Exception as e: - traceback.print_exc() - _fail(f"aenv initialize failed: {e!r}") - - if not env._instance: - _fail("env._instance is None after initialize") - sandbox_id = env._instance.id - _ok(f"initialize ok in {time.time()-t0:.1f}s (sandbox_id={sandbox_id})") + inst = await client.create_env_instance(name=ENV_NAME, ttl="10m") + print( + f"[1] created sandbox in {time.time()-t0:.2f}s " + f"(id={inst.id}, status={inst.status})" + ) try: - presigned = await env.presign_url( + # -- 2. presign a URL pointing at sandbox:$PORT ------------------ + url = await client.presign_url( + inst.id, port=SERVICE_PORT, expiration_time_in_minutes=PRESIGN_TTL_MIN, ) - except Exception as e: - traceback.print_exc() - _fail(f"env.presign_url failed: {e!r}") - if not presigned or not presigned.startswith("http"): - _fail(f"presigned url malformed: {presigned!r}") - _ok(f"presign_url ok (port={SERVICE_PORT}, ttl={PRESIGN_TTL_MIN}m)") - _info(f"presigned URL: {presigned}") - - target = presigned.rstrip("/") + ( - SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH - ) + print(f"[2] presigned URL ({SERVICE_PORT}): {url}") - # Arca "RUNNING" status only guarantees the pod is up, not that the - # in-sandbox process is listening. Poll until 2xx or timeout. - deadline = time.time() + READINESS_TIMEOUT_S - resp = None - attempts = 0 - async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: - while time.time() < deadline: - attempts += 1 - try: - resp = await client.get(target) - if 200 <= resp.status_code < 300: - break - _info(f"readiness attempt {attempts}: status={resp.status_code}") - except Exception as e: - _info(f"readiness attempt {attempts}: {e!r}") - await asyncio.sleep(2.0) - - if resp is None or not (200 <= resp.status_code < 300): - last_status = resp.status_code if resp else "no-response" - last_body = resp.text[:200] if resp is not None else "" - _fail( - f"sandbox service never became ready (last={last_status}, " - f"body={last_body!r}, attempts={attempts})" + # -- 3. poll the in-sandbox service until it's up ---------------- + target = url.rstrip("/") + ( + SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH ) - body_excerpt = (resp.text or "")[:200].replace("\n", " ") - _info( - f"sandbox service GET {SERVICE_PATH} -> {resp.status_code} body={body_excerpt!r}" - ) - _ok(f"in-sandbox service reachable via presigned URL ({resp.status_code})") - - finally: - try: - await env.release() - _ok(f"release ok (sandbox_id={sandbox_id})") - except Exception as e: - traceback.print_exc() - _fail(f"aenv release failed: {e!r}") + print(f"[3] waiting for {SERVICE_PATH} to return 2xx...") + resp = await wait_ready(target) + print(f" ready in {time.time()-t0:.1f}s; status={resp.status_code}") + print(f" body: {resp.text[:200]!r}") + # -- 4. (your real traffic goes here) ---------------------------- + # The same ``url`` is a valid base for HTTP / WebSocket / anything + # the sandbox process listens for on that port. It remains valid + # until the presign TTL expires. + print("[4] (business traffic would run here)") + finally: + # -- 5. always release ------------------------------------------- + await client.delete_env_instance(inst.id) + print(f"[5] released sandbox (id={inst.id})") -async def main() -> None: - _require_env() - _info(f"api-service: {API_SERVICE_URL}") - _info(f"env name: {ENV_NAME}") - await lifecycle() - print("\n[PASS] e2e arca presign happy path", flush=True) + return 0 if __name__ == "__main__": - asyncio.run(main()) + sys.exit(asyncio.run(main())) From bcdde83be08a8293f9a2be8b89ac9e30078fac00 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 6 May 2026 15:37:39 +0800 Subject: [PATCH 07/11] fix(api-service): return 501 for arca data-plane traffic Arca sandboxes do not embed the aenv MCP server, so any traffic to api-service:8081 (/mcp, /health, /sse) cannot be honored. Replace the prior /health short-circuit (which faked a healthy envelope so the SDK would proceed) with a uniform 501 + actionable message pointing the caller at presign_url(). The SDK is expected to opt out via Environment(enable_data_plane=False). Drop the now-unused arca reverse-proxy handlers (arcaTargetURL, handleArcaHTTP, handleArcaSSE) and their helper constants. They can be reintroduced from git history if a future arca image embeds the aenv MCP server. --- api-service/controller/mcp_proxy.go | 205 ++-------------------------- 1 file changed, 12 insertions(+), 193 deletions(-) diff --git a/api-service/controller/mcp_proxy.go b/api-service/controller/mcp_proxy.go index c7fec5dc..40e40163 100644 --- a/api-service/controller/mcp_proxy.go +++ b/api-service/controller/mcp_proxy.go @@ -18,12 +18,10 @@ package controller import ( "api-service/constants" - "fmt" "io" "net/http" "net/http/httputil" "net/url" - "strings" "time" "github.com/gin-gonic/gin" @@ -53,11 +51,6 @@ const ( // Schedule types (kept in sync with main.go --schedule-type). scheduleTypeArca = "arca" - - // Arca gateway conventions (kept in sync with service/arca_client.go). - arcaGatewaySandboxPrefix = "/arca/api/v1/sandbox" - arcaHeaderAPIKey = "x-agent-sandbox-api-key" - arcaHeaderSandboxID = "x-agent-sandbox-id" ) // MCPGatewayConfig is injected at api-service startup so the gateway never @@ -99,18 +92,18 @@ func (g *MCPGateway) setupRoutes() { func (g *MCPGateway) innerRouter(c *gin.Context) { path := c.Param("path") if g.config.ScheduleType == scheduleTypeArca { - // Arca engine: the gateway doesn't trust SDK-provided proxy URL. - // Target is derived from startup config + sandbox id header. - if path == PathHealth { - g.arcaHealthCheck(c) - return - } - switch path { - case PathSSE: - g.handleArcaSSE(c) - default: - g.handleArcaHTTP(c) - } + // Arca sandboxes do not embed the aenv MCP server, so the data + // plane (MCP / /health / SSE) is not supported. The SDK is + // expected to opt out via Environment(enable_data_plane=False) + // and use presign_url() instead. We still respond explicitly so + // stray callers get a clear error rather than a hang. + c.JSON(http.StatusNotImplemented, gin.H{ + "success": false, + "code": http.StatusNotImplemented, + "message": "data plane (MCP / /health) is not supported on arca engine; " + + "use presign_url() to expose an in-sandbox port", + "data": nil, + }) return } @@ -137,22 +130,6 @@ func (g *MCPGateway) healthCheck(c *gin.Context) { }) } -// arcaHealthCheck returns an aenv-envelope shaped response so the SDK's -// _wait_for_healthy (which expects success=true with data.status=healthy) -// can parse it. In arca mode the sandbox liveness is already guaranteed by -// the control-plane RUNNING status, so the gateway short-circuits here -// instead of round-tripping through the Arca gateway. -// -// Supported engines: arca. -func (g *MCPGateway) arcaHealthCheck(c *gin.Context) { - c.JSON(http.StatusOK, gin.H{ - "success": true, - "code": 0, - "message": "", - "data": gin.H{"status": "healthy"}, - }) -} - // getMCPSeverURL gets MCP server URL from request header func (g *MCPGateway) getMCPSeverURL(c *gin.Context) (string, error) { mcpServerURL := c.GetHeader(constants.HeaderMCPServerURL) @@ -361,164 +338,6 @@ func (g *MCPGateway) handleMCPHTTPWithHeader(c *gin.Context) { proxy.ServeHTTP(c.Writer, c.Request) } -// arcaTargetURL resolves the target URL for an arca request. The caller's -// path is preserved after the {arca gateway prefix}/{sandbox_id} base, which -// matches what arca-sandbox SDK speaks natively. -// -// Supported engines: arca. -func (g *MCPGateway) arcaTargetURL(c *gin.Context) (*url.URL, string, error) { - sandboxID := c.GetHeader(constants.HeaderEnvInstanceID) - if sandboxID == "" { - return nil, "", &MCPError{ - Code: http.StatusBadRequest, - Message: constants.HeaderEnvInstanceID + " header is required for arca engine", - } - } - if g.config.ArcaBaseURL == "" { - return nil, "", &MCPError{ - Code: http.StatusInternalServerError, - Message: "arca base URL is not configured on api-service", - } - } - base, err := url.Parse(g.config.ArcaBaseURL) - if err != nil { - return nil, "", &MCPError{ - Code: http.StatusInternalServerError, - Message: "invalid arca base URL", - Details: err.Error(), - } - } - tail := strings.TrimPrefix(c.Request.URL.Path, "/") - base.Path = fmt.Sprintf("%s/%s", arcaGatewaySandboxPrefix, sandboxID) - if tail != "" { - base.Path = base.Path + "/" + tail - } - base.RawQuery = c.Request.URL.RawQuery - return base, sandboxID, nil -} - -// handleArcaHTTP reverse-proxies non-SSE MCP traffic to the arca gateway. -// Target is derived from startup config and X-Instance-ID, not any SDK-provided URL. -// -// Supported engines: arca. -func (g *MCPGateway) handleArcaHTTP(c *gin.Context) { - targetURL, sandboxID, err := g.arcaTargetURL(c) - if err != nil { - if mcpErr, ok := err.(*MCPError); ok { - c.JSON(mcpErr.Code, gin.H{"error": mcpErr.Message}) - return - } - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - proxy := &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL.Scheme = targetURL.Scheme - req.URL.Host = targetURL.Host - req.URL.Path = targetURL.Path - req.URL.RawQuery = targetURL.RawQuery - req.Host = targetURL.Host - req.Header.Del(constants.HeaderMCPServerURL) - req.Header.Set(arcaHeaderAPIKey, g.config.ArcaAPIKey) - req.Header.Set(arcaHeaderSandboxID, sandboxID) - }, - Transport: g.transport, - ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { - log.Errorf("arca proxy error for sandbox %s: %v", sandboxID, err) - c.JSON(http.StatusBadGateway, gin.H{ - "error": "Failed to forward request to arca gateway", - "details": err.Error(), - "sandbox": sandboxID, - }) - }, - } - proxy.ServeHTTP(c.Writer, c.Request) -} - -// handleArcaSSE streams SSE responses from the arca gateway back to the caller. -// -// Supported engines: arca. -func (g *MCPGateway) handleArcaSSE(c *gin.Context) { - targetURL, sandboxID, err := g.arcaTargetURL(c) - if err != nil { - if mcpErr, ok := err.(*MCPError); ok { - c.JSON(mcpErr.Code, gin.H{"error": mcpErr.Message}) - return - } - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - c.Header(HeaderContentType, ContentTypeSSE) - c.Header(HeaderCacheControl, "no-cache") - c.Header(HeaderConnection, "keep-alive") - c.Header("Access-Control-Allow-Origin", "*") - - req, err := http.NewRequest(MethodGET, targetURL.String(), nil) - if err != nil { - log.Errorf("arca SSE: failed to create request: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create request"}) - return - } - req.Header.Set(HeaderAccept, ContentTypeSSE) - req.Header.Set(HeaderCacheControl, "no-cache") - g.copyHeadersExcept(c.Request.Header, req.Header, - constants.HeaderMCPServerURL, - arcaHeaderAPIKey, - arcaHeaderSandboxID, - ) - req.Header.Set(arcaHeaderAPIKey, g.config.ArcaAPIKey) - req.Header.Set(arcaHeaderSandboxID, sandboxID) - - client := &http.Client{Transport: g.transport} - resp, err := client.Do(req) - if err != nil { - log.Errorf("arca SSE: connect failed sandbox=%s: %v", sandboxID, err) - c.JSON(http.StatusBadGateway, gin.H{"error": "Failed to connect to arca gateway"}) - return - } - defer func() { - if closeErr := resp.Body.Close(); closeErr != nil { - log.Warnf("failed to close arca SSE response body: %v", closeErr) - } - }() - - if resp.StatusCode != http.StatusOK { - log.Warnf("arca SSE: upstream status %d sandbox=%s", resp.StatusCode, sandboxID) - c.JSON(resp.StatusCode, gin.H{"error": "arca gateway error"}) - return - } - - for name, values := range resp.Header { - if name != HeaderContentType { - for _, value := range values { - c.Header(name, value) - } - } - } - c.Header(HeaderContentType, ContentTypeSSE) - - buf := make([]byte, 4096) - for { - n, err := resp.Body.Read(buf) - if err != nil { - if err != io.EOF { - log.Errorf("arca SSE read error sandbox=%s: %v", sandboxID, err) - } - break - } - if n > 0 { - if _, writeErr := c.Writer.Write(buf[:n]); writeErr != nil { - log.Errorf("arca SSE write error: %v", writeErr) - break - } - if flusher, ok := c.Writer.(http.Flusher); ok { - flusher.Flush() - } - } - } -} - // GetRouter gets router instance func (g *MCPGateway) GetRouter() *gin.RouterGroup { return g.router From d2150cd3a4b884a812656b9ea726cd29fac92c37 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 6 May 2026 15:38:26 +0800 Subject: [PATCH 08/11] feat(aenv): add enable_data_plane flag to gate MCP data plane When enable_data_plane=False the SDK does not touch port 8081 at all: no MCP session, no /health probe, and call_tool / list_tools / list_functions / call_reward / check_health / call_function raise. presign_url still works because it is a control-plane API. Required for arca-engine sandboxes whose images do not embed the aenv MCP server. skip_for_healthy keeps its narrow legacy semantics (only short-circuits the /health probe). Default behavior unchanged. The arca presign example is rewritten on top of the public Environment API instead of the internal AEnvSchedulerClient. --- aenv/e2e_arca_presign_test.py | 113 ++++++++++++++---------------- aenv/src/aenv/core/environment.py | 41 +++++++++-- 2 files changed, 90 insertions(+), 64 deletions(-) diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py index 616fdfae..3098b247 100644 --- a/aenv/e2e_arca_presign_test.py +++ b/aenv/e2e_arca_presign_test.py @@ -1,29 +1,30 @@ #!/usr/bin/env python3 """ -Example: aenv + arca engine — fast create with no ready-wait, then access -the sandbox via a presigned URL (direct data-plane, bypassing api-service). - -When to use this pattern instead of ``Environment.initialize``: - * You want to expose an in-sandbox HTTP/WebSocket service to arbitrary - external clients (e.g. a UI, a webhook, an LLM agent on another host), - not just the local Python process. - * You don't need aenv's MCP/tool-call flows — only lifecycle + a URL. - * You want minimum latency at create time: arca's presign accepts the - sandbox even in PENDING, and the in-sandbox app itself exposes - readiness (so the SDK's _wait_for_healthy is redundant). +Example: aenv + arca engine — fast create with no data-plane wait, then +access the sandbox via a presigned URL (direct data-plane, bypassing +api-service). -High-level flow: - 1. create sandbox (AEnvSchedulerClient.create_env_instance) - 2. presign URL (AEnvSchedulerClient.presign_url) - 3. caller polls the URL (httpx, business-defined readiness path) - 4. caller does real work (whatever they want to call on the sandbox) - 5. delete sandbox (AEnvSchedulerClient.delete_env_instance) +When to use this pattern: + * Engine is arca and the sandbox image runs an arbitrary user app + (no aenv MCP server inside). + * You want minimum latency at create time and prefer to wait on the + in-sandbox app's own readiness path. -There is NO call to ``Environment.initialize`` / ``wait_for_ready`` / -``_wait_for_healthy`` — all waiting is pushed to the business layer. +High-level flow: + 1. create sandbox (Environment.__aenter__, no MCP session) + 2. presign URL (Environment.presign_url) + 3. caller polls the URL (httpx, business-defined readiness path) + 4. caller does real work (whatever the sandbox process listens for) + 5. release (Environment.__aexit__) + +``enable_data_plane=False`` is what makes this work on arca: the SDK +skips the MCP session and the /health probe entirely, so no traffic +hits api-service:8081 (which on arca returns 501 by design). The +``call_tool`` / ``list_tools`` / ``call_function`` / ``call_reward`` / +``check_health`` methods will raise if invoked under this flag. Env vars: - ARCA_API_SERVICE_URL api-service-arca base URL (default http://localhost:18080) + AENV_SYSTEM_URL api-service base URL (default http://localhost) ARCA_TEST_ENV_NAME envhub env name (arcaTemplateId inside) (required) AENV_API_KEY bearer token if api-service has auth enabled (optional) ARCA_SERVICE_PORT in-sandbox port to expose (default 18080) @@ -41,11 +42,10 @@ import httpx -from aenv.client.scheduler_client import AEnvSchedulerClient -from aenv.core.environment import make_mcp_url +from aenv import Environment -API_SERVICE_URL = os.environ.get("ARCA_API_SERVICE_URL", "http://localhost:18080") ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +AENV_URL = os.environ.get("AENV_SYSTEM_URL", os.environ.get("ARCA_API_SERVICE_URL", "")) AENV_API_KEY = os.environ.get("AENV_API_KEY", "") SERVICE_PORT = int(os.environ.get("ARCA_SERVICE_PORT", "18080")) @@ -85,50 +85,43 @@ async def main() -> int: print("missing required env var ARCA_TEST_ENV_NAME", file=sys.stderr) return 1 - control_url = make_mcp_url(API_SERVICE_URL, 8080) - print(f"api-service: {control_url}") - print(f"env_name: {ENV_NAME}") + print(f"env_name: {ENV_NAME}") + print(f"aenv_url: {AENV_URL or '(default)'}") - async with AEnvSchedulerClient( - base_url=control_url, + t0 = time.time() + async with Environment( + env_name=ENV_NAME, + aenv_url=AENV_URL or None, api_key=AENV_API_KEY or None, - ) as client: - # -- 1. create sandbox (no wait for RUNNING/healthy) ----------------- - t0 = time.time() - inst = await client.create_env_instance(name=ENV_NAME, ttl="10m") + ttl="10m", + enable_data_plane=False, + ) as env: + info = await env.get_env_info() print( f"[1] created sandbox in {time.time()-t0:.2f}s " - f"(id={inst.id}, status={inst.status})" + f"(id={info['instance_id']}, status={info['status']})" + ) + + url = await env.presign_url( + port=SERVICE_PORT, + expiration_time_in_minutes=PRESIGN_TTL_MIN, + ) + print(f"[2] presigned URL ({SERVICE_PORT}): {url}") + + target = url.rstrip("/") + ( + SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH ) + print(f"[3] waiting for {SERVICE_PATH} to return 2xx...") + resp = await wait_ready(target) + print(f" ready in {time.time()-t0:.1f}s; status={resp.status_code}") + print(f" body: {resp.text[:200]!r}") - try: - # -- 2. presign a URL pointing at sandbox:$PORT ------------------ - url = await client.presign_url( - inst.id, - port=SERVICE_PORT, - expiration_time_in_minutes=PRESIGN_TTL_MIN, - ) - print(f"[2] presigned URL ({SERVICE_PORT}): {url}") - - # -- 3. poll the in-sandbox service until it's up ---------------- - target = url.rstrip("/") + ( - SERVICE_PATH if SERVICE_PATH.startswith("/") else "/" + SERVICE_PATH - ) - print(f"[3] waiting for {SERVICE_PATH} to return 2xx...") - resp = await wait_ready(target) - print(f" ready in {time.time()-t0:.1f}s; status={resp.status_code}") - print(f" body: {resp.text[:200]!r}") - - # -- 4. (your real traffic goes here) ---------------------------- - # The same ``url`` is a valid base for HTTP / WebSocket / anything - # the sandbox process listens for on that port. It remains valid - # until the presign TTL expires. - print("[4] (business traffic would run here)") - finally: - # -- 5. always release ------------------------------------------- - await client.delete_env_instance(inst.id) - print(f"[5] released sandbox (id={inst.id})") + # The same ``url`` is a valid base for HTTP / WebSocket / anything + # the sandbox process listens for on that port. It remains valid + # until the presign TTL expires. + print("[4] (business traffic would run here)") + print(f"[5] released sandbox in {time.time()-t0:.1f}s total") return 0 diff --git a/aenv/src/aenv/core/environment.py b/aenv/src/aenv/core/environment.py index 7b06d272..1256f6bd 100644 --- a/aenv/src/aenv/core/environment.py +++ b/aenv/src/aenv/core/environment.py @@ -121,6 +121,7 @@ def __init__( max_retries: int = 10, api_key: Optional[str] = None, skip_for_healthy: bool = False, + enable_data_plane: bool = True, owner: Optional[str] = None, labels: Optional[Dict[str, str]] = None, mount_points: Optional[List[Dict[str, Any]]] = None, @@ -139,7 +140,18 @@ def __init__( ttl: Time to live in seconds defaults to 10 minutes max_retries: Maximum retry attempts for failed requests api_key: Optional API key for authentication - skip_for_healthy: Skip health check if True (defaults to False) + skip_for_healthy: Skip the data-plane ``/health`` readiness probe + only. Has no effect when ``enable_data_plane=False`` (the + probe is skipped unconditionally in that mode). + enable_data_plane: When True (default), the SDK opens an MCP + session against the sandbox on port 8081 and exposes + ``call_tool`` / ``list_tools`` / ``call_function`` / + ``call_reward`` / ``check_health``. When False, the data + plane is not touched at all — no MCP session, no ``/health`` + probe, and the above methods raise ``EnvironmentError``. + ``presign_url`` still works because it is a control-plane + API. Required for arca-engine sandboxes whose images do not + embed the aenv MCP server. mount_points: Optional list of mount-point dicts forwarded to the backend sandbox engine. Each entry: ``{"id": "OSS_xxx", "remote_dir": "/data", "local_dir": "/workspace"}``. @@ -151,6 +163,7 @@ def __init__( self.arguments = arguments or [] self.dummy_instance_ip = os.getenv("DUMMY_INSTANCE_IP") self.skip_for_healthy = skip_for_healthy + self.enable_data_plane = enable_data_plane self.owner = owner self.labels = labels # Supported engines: arca (ignored on k8s/standard/faas). @@ -190,6 +203,14 @@ def _log_prefix(self) -> str: ) return f"[ENV:{instance_id}][sdk:v{__version__}]" + def _require_data_plane(self, op: str) -> None: + if not self.enable_data_plane: + raise EnvironmentError( + f"{op} is unavailable: this Environment was created with " + f"enable_data_plane=False (no MCP session, no /health). Use " + f"presign_url() to expose an in-sandbox port instead." + ) + async def _backoff(self, attempt: int, base: float = 2.0) -> None: """Exponential backoff with jitter.""" wait = base**attempt + random.uniform(0, 1) @@ -211,6 +232,8 @@ async def _rebuild_mcp_client(self) -> None: async def __aenter__(self): """Async context manager entry.""" await self.initialize() + if not self.enable_data_plane: + return self max_attempts = 3 for attempt in range(max_attempts): try: @@ -230,7 +253,8 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" - await self._close_mcp_session() + if self.enable_data_plane: + await self._close_mcp_session() await self.release() async def initialize(self) -> bool: @@ -383,6 +407,7 @@ async def list_tools(self) -> List[Dict[str, Any]]: Returns: List of tool descriptors in MCP format """ + self._require_data_plane("list_tools") await self._ensure_initialized() try: @@ -471,6 +496,7 @@ async def list_functions(self) -> Dict[str, Any]: Returns: Dictionary containing categorized function lists (functions, reward, health) """ + self._require_data_plane("list_functions") await self._ensure_initialized() try: @@ -539,6 +565,7 @@ async def call_reward( Raises: EnvironmentError: If reward execution fails """ + self._require_data_plane("call_reward") await self._ensure_initialized() return await self._call_function( self.aenv_reward_url, arguments=arguments, timeout=timeout @@ -673,6 +700,7 @@ async def check_health( Raises: EnvironmentError: If health check execution fails """ + self._require_data_plane("check_health") await self._ensure_initialized() logger.info( @@ -732,6 +760,7 @@ async def call_tool( ToolError: If tool execution fails after invocation EnvironmentError: If session cannot be established """ + self._require_data_plane("call_tool") await self._ensure_initialized() # Circuit breaker: fail fast if too many consecutive tool errors @@ -856,9 +885,12 @@ async def _ensure_initialized(self): async def _wait_for_healthy(self, timeout: float = 300.0) -> None: """Wait for environment instance to be healthy.""" - if self.skip_for_healthy: + if not self.enable_data_plane or self.skip_for_healthy: logger.info( - f"{self._log_prefix()} Skipping health check for environment {self.env_name}" + f"{self._log_prefix()} Skipping /health probe for environment " + f"{self.env_name} " + f"(enable_data_plane={self.enable_data_plane}, " + f"skip_for_healthy={self.skip_for_healthy})" ) return @@ -1030,6 +1062,7 @@ async def call_function( Raises: EnvironmentError: If function execution fails """ + self._require_data_plane("call_function") await self._ensure_initialized() function_url = f"{self.aenv_functions_base_url}/{function_name}" return await self._call_function( From ef3d7b9f7a3729e64612cd36f403049d65b86dd2 Mon Sep 17 00:00:00 2001 From: meijun Date: Wed, 6 May 2026 16:01:35 +0800 Subject: [PATCH 09/11] test(aenv): add arca contract tests for enable_data_plane=False Two rounds asserting the SDK<->api-service contract in arca mode: - Round 2: every data-plane Environment method raises EnvironmentError mentioning enable_data_plane=False instead of hitting api-service:8081 - Round 3: api-service:8081 returns HTTP 501 with an actionable message for /health, /mcp, and arbitrary paths Verified against tydd-staging api-service-arca:v7 + arca-real@1.0.0. --- aenv/e2e_arca_negative_test.py | 133 +++++++++++++++++++++++++++++++++ aenv/e2e_arca_presign_test.py | 2 - 2 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 aenv/e2e_arca_negative_test.py diff --git a/aenv/e2e_arca_negative_test.py b/aenv/e2e_arca_negative_test.py new file mode 100644 index 00000000..0b55a7f7 --- /dev/null +++ b/aenv/e2e_arca_negative_test.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +""" +Contract tests for arca + ``enable_data_plane=False``. + +Round 2: every data-plane method on ``Environment`` raises +``EnvironmentError`` mentioning ``enable_data_plane=False`` instead of +silently hitting api-service:8081. + +Round 3: api-service:8081 (the MCP gateway port) returns HTTP 501 with an +actionable message for any path when running in arca schedule mode. This +is the server-side guarantee that pairs with Round 2. + +Both rounds talk to a real api-service-arca + arca sandbox, so the script +must run against tydd-staging (or another arca-mode deployment). + +Env vars: + AENV_SYSTEM_URL api-service base URL on :8080 (required) + ARCA_TEST_ENV_NAME envhub env name (e.g. arca-real@1.0.0) (required) + AENV_API_KEY optional bearer token +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +from urllib.parse import urlparse, urlunparse + +import httpx + +from aenv import Environment +from aenv.core.exceptions import EnvironmentError + +CONTROL_URL = os.environ.get( + "AENV_SYSTEM_URL", os.environ.get("ARCA_API_SERVICE_URL", "") +) +ENV_NAME = os.environ.get("ARCA_TEST_ENV_NAME", "") +AENV_API_KEY = os.environ.get("AENV_API_KEY", "") + +DATA_PLANE_METHODS: list[tuple[str, tuple]] = [ + ("call_tool", ("t", {})), + ("list_tools", ()), + ("list_functions", ()), + ("call_reward", ({},)), + ("check_health", ({},)), + ("call_function", ("f", {})), +] + + +def _data_plane_url(control_url: str) -> str: + """Swap the :8080 port in control_url for :8081 (the MCP gateway).""" + parsed = urlparse(control_url if "://" in control_url else f"http://{control_url}") + host = parsed.hostname or "127.0.0.1" + return urlunparse(parsed._replace(netloc=f"{host}:8081", path="", query="")) + + +async def round2_guards() -> bool: + print("=== Round 2: Environment guards under enable_data_plane=False ===") + async with Environment( + env_name=ENV_NAME, + aenv_url=CONTROL_URL, + api_key=AENV_API_KEY or None, + enable_data_plane=False, + ttl="5m", + ) as env: + for name, args in DATA_PLANE_METHODS: + try: + await getattr(env, name)(*args) + except EnvironmentError as e: + if "enable_data_plane=False" not in str(e): + print(f" {name}: raised wrong message: {e}") + return False + print(f" {name}: blocked OK") + continue + print(f" {name}: did NOT raise -- BUG") + return False + print("Round 2 PASS") + return True + + +async def round3_501() -> bool: + print("=== Round 3: api-service:8081 returns 501 for arca data plane ===") + data_plane_base = _data_plane_url(CONTROL_URL) + async with Environment( + env_name=ENV_NAME, + aenv_url=CONTROL_URL, + api_key=AENV_API_KEY or None, + enable_data_plane=False, + ttl="5m", + ) as env: + info = await env.get_env_info() + sandbox_id = info["instance_id"] + async with httpx.AsyncClient(timeout=10.0) as client: + for path in ["/health", "/mcp", "/some/random"]: + resp = await client.get( + f"{data_plane_base}{path}", + headers={"AEnvCore-EnvInstance-ID": sandbox_id}, + ) + try: + body = resp.json() + except Exception: + body = {"_raw": resp.text[:200]} + + ok = ( + resp.status_code == 501 + and isinstance(body, dict) + and "data plane" in str(body.get("message", "")) + ) + tag = "OK" if ok else "FAIL" + print( + f" {path}: status={resp.status_code} body={json.dumps(body)} [{tag}]" + ) + if not ok: + return False + print("Round 3 PASS") + return True + + +async def main() -> int: + if not CONTROL_URL or not ENV_NAME: + print( + "missing required env vars AENV_SYSTEM_URL and/or ARCA_TEST_ENV_NAME", + file=sys.stderr, + ) + return 2 + r2 = await round2_guards() + r3 = await round3_501() + return 0 if (r2 and r3) else 1 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/aenv/e2e_arca_presign_test.py b/aenv/e2e_arca_presign_test.py index 3098b247..0dd68390 100644 --- a/aenv/e2e_arca_presign_test.py +++ b/aenv/e2e_arca_presign_test.py @@ -91,8 +91,6 @@ async def main() -> int: t0 = time.time() async with Environment( env_name=ENV_NAME, - aenv_url=AENV_URL or None, - api_key=AENV_API_KEY or None, ttl="10m", enable_data_plane=False, ) as env: From af604d860f28db53c04be0228e49cbfa01f4fbbe Mon Sep 17 00:00:00 2001 From: meijun Date: Mon, 11 May 2026 20:03:04 +0800 Subject: [PATCH 10/11] fix(envhub): replace WATCH with in-process key mutex for backend portability The Redis env storage previously relied on WATCH/MULTI/EXEC for optimistic locking on Update, and TxPipeline for atomic Delete. Some Redis-compatible backends (notably Ant Group TBase via Cache Mesh) reject these transactional primitives. Use a per-key sync.Mutex inside RedisEnvStorage to serialize the read-then-write sequence, and replace TxPipeline with sequential DEL+SREM. Trade-off: the mutex is process-local, so multi-replica deployments lose wire-level CAS semantics. envhub is currently deployed with replicaCount=1 and Update calls are infrequent, so the trade-off is acceptable. A future multi-replica deployment should layer a coordination service (etcd / lease) on top. Verified against MOSN+TBase locally: POST /env/ -> 200 GET /env/{n}/{v} -> 200 PUT /env/{n}/{v} -> 200 (was 500: 'unsupported command WATCH') POST /release -> 200 (was 500) DELETE-equivalent path -> 200 (was 500: 'unsupported command MULTI') --- envhub/service/env_redis.go | 91 +++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/envhub/service/env_redis.go b/envhub/service/env_redis.go index 878f0f9b..1c6ec975 100644 --- a/envhub/service/env_redis.go +++ b/envhub/service/env_redis.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "sort" + "sync" "time" redis "github.com/go-redis/redis/v8" @@ -46,6 +47,13 @@ type RedisEnvStorage struct { client *redis.Client keyPrefix string indexKey string + + keyLocks sync.Map +} + +func (s *RedisEnvStorage) keyLock(key string) *sync.Mutex { + v, _ := s.keyLocks.LoadOrStore(key, &sync.Mutex{}) + return v.(*sync.Mutex) } var _ EnvStorage = (*RedisEnvStorage)(nil) @@ -128,57 +136,60 @@ func (s *RedisEnvStorage) Create(ctx context.Context, key string, env *models.En return nil } -// Update updates Env object +// Update applies an optimistic check-and-set update on the env record. +// Returns ErrEnvNotFound if the key is absent and a version mismatch error +// if the on-disk record was concurrently modified. func (s *RedisEnvStorage) Update(ctx context.Context, key string, env *models.Env, resourceVersion int64, labels map[string]string) error { - redisKey := s.dataKey(key) - return s.client.Watch(ctx, func(tx *redis.Tx) error { - payload, err := tx.Get(ctx, redisKey).Bytes() - if errors.Is(err, redis.Nil) { - return fmt.Errorf("%w: %s", ErrEnvNotFound, key) - } - if err != nil { - return fmt.Errorf("failed to read env %s: %w", key, err) - } + mu := s.keyLock(key) + mu.Lock() + defer mu.Unlock() - var record redisEnvRecord - if err := json.Unmarshal(payload, &record); err != nil { - return fmt.Errorf("failed to unmarshal env %s: %w", key, err) - } + redisKey := s.dataKey(key) - if record.ResourceVersion != resourceVersion { - return fmt.Errorf("resource version mismatch for %s: expect %d got %d", key, record.ResourceVersion, resourceVersion) - } + current, err := s.loadRecord(ctx, key) + if err != nil { + return err + } + if current.ResourceVersion != resourceVersion { + return fmt.Errorf("resource version mismatch for %s: expect %d got %d", key, current.ResourceVersion, resourceVersion) + } - record.Env = env - if labels != nil { - record.Labels = copyLabels(labels) - } - record.ResourceVersion++ - record.LastUpdatedEpoch = time.Now().Unix() + updated := redisEnvRecord{ + Env: env, + Labels: current.Labels, + ResourceVersion: current.ResourceVersion + 1, + LastUpdatedEpoch: time.Now().Unix(), + } + if labels != nil { + updated.Labels = copyLabels(labels) + } - updatedPayload, err := json.Marshal(record) - if err != nil { - return fmt.Errorf("failed to marshal updated env %s: %w", key, err) - } + payload, err := json.Marshal(updated) + if err != nil { + return fmt.Errorf("failed to marshal updated env %s: %w", key, err) + } - _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { - p.Set(ctx, redisKey, updatedPayload, 0) - p.SAdd(ctx, s.indexKey, key) - return nil - }) - return err - }, redisKey) + if err := s.client.Set(ctx, redisKey, payload, 0).Err(); err != nil { + return fmt.Errorf("failed to write env %s: %w", key, err) + } + if err := s.client.SAdd(ctx, s.indexKey, key).Err(); err != nil { + return fmt.Errorf("failed to update index for env %s: %w", key, err) + } + return nil } -// Delete deletes Env object +// Delete removes the env record and its index entry. func (s *RedisEnvStorage) Delete(ctx context.Context, key string) error { - redisKey := s.dataKey(key) - pipe := s.client.TxPipeline() - pipe.Del(ctx, redisKey) - pipe.SRem(ctx, s.indexKey, key) - if _, err := pipe.Exec(ctx); err != nil { + mu := s.keyLock(key) + mu.Lock() + defer mu.Unlock() + + if err := s.client.Del(ctx, s.dataKey(key)).Err(); err != nil { return fmt.Errorf("failed to delete env %s: %w", key, err) } + if err := s.client.SRem(ctx, s.indexKey, key).Err(); err != nil { + return fmt.Errorf("failed to remove env %s from index: %w", key, err) + } return nil } From 43f64b99023c535a5c43855525d32e4b98380c2f Mon Sep 17 00:00:00 2001 From: meijun Date: Fri, 15 May 2026 13:30:20 +0800 Subject: [PATCH 11/11] fix(aenv): make openai-agents an optional [agents] extra Top-level imports of agents.tool eagerly pulled openai>=2.x, breaking clients pinned to openai<2 (e.g. langchain-openai 0.3.x). list_openai_tools is the only API that needs openai-agents; move its imports under TYPE_CHECKING + lazy import inside the method, and drop openai-agents from the default dependency set. Users who need list_openai_tools install with: pip install aenvironment[agents] --- aenv/pyproject.toml | 5 ++++- aenv/src/aenv/core/environment.py | 24 +++++++++++++++++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/aenv/pyproject.toml b/aenv/pyproject.toml index a868733b..f919aeec 100644 --- a/aenv/pyproject.toml +++ b/aenv/pyproject.toml @@ -37,13 +37,16 @@ dependencies = [ "typer>=0.9.0", "tabulate>=0.9.0", "colorlog>=6.10.1", - "openai-agents>=0.6.3", "starlette>=0.27.0", "urllib3>=1.26.0", "docker>=6.0.0", ] [project.optional-dependencies] +agents = [ + "openai-agents>=0.6.3", +] + dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", diff --git a/aenv/src/aenv/core/environment.py b/aenv/src/aenv/core/environment.py index 1256f6bd..b9f09159 100644 --- a/aenv/src/aenv/core/environment.py +++ b/aenv/src/aenv/core/environment.py @@ -23,16 +23,16 @@ import random import traceback from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from urllib.parse import urlparse, urlunparse import httpx -from agents.tool import FunctionTool -from agents.tool import Tool as OpenAITool -from agents.tool_context import ToolContext from fastmcp import Client from fastmcp.client.transports import StreamableHttpTransport +if TYPE_CHECKING: + from agents.tool import Tool as OpenAITool + from aenv.client.scheduler_client import AEnvSchedulerClient from aenv.core.exceptions import ( EnvironmentError, @@ -439,10 +439,20 @@ async def list_tools(self) -> List[Dict[str, Any]]: f"Failed to list tools for environment '{self.env_name}': {str(e)}" ) - async def list_openai_tools(self) -> List[OpenAITool]: + async def list_openai_tools(self) -> "List[OpenAITool]": + try: + from agents.tool import FunctionTool + from agents.tool import Tool as OpenAITool # noqa: F401 + from agents.tool_context import ToolContext # noqa: F401 + except ImportError as e: + raise ImportError( + "list_openai_tools() requires the 'agents' extra. " + "Install it with: pip install 'aenvironment[agents]'" + ) from e + tools = await self.list_tools() - openai_tools: List[OpenAITool] = [] + openai_tools: List[Any] = [] for tool in tools: name = str(tool.get("name", "")) description = str(tool.get("description", "")) @@ -451,7 +461,7 @@ async def list_openai_tools(self) -> List[OpenAITool]: input_schema = {"type": "object", "properties": {}} async def _on_invoke_tool( - ctx: ToolContext[Any], input: str, *, _name: str = name + ctx: Any, input: str, *, _name: str = name ) -> Any: try: args: Dict[str, Any] = json.loads(input) if input else {}