Add Arca sandbox engine support #75
Conversation
- ArcaClient implements Create/Get/Delete against Arca OpenAPI
/arca/openapi/v1/sandbox/*, with DeployConfig keys arcaTemplateId and
mountPoints. List/Warmup return clear not-supported errors.
- PresignURL method on EnvInstanceService interface; ArcaClient calls
/arca/api/v1/sandbox/{id}/presign/token; other engines return a clear
"not supported on this engine" error.
- POST /env-instance/:id/presign-url endpoint forwards to the service
layer so the SDK stays engine-unaware.
- MCPGateway gets engine type from --schedule-type at startup. Under
arca it resolves the target from the configured gateway base URL plus
the AEnvCore-EnvInstance-ID header (SDK-provided MCPProxy-URL is
ignored), and injects x-agent-sandbox-api-key / x-agent-sandbox-id on
each forwarded request.
- Controller CreateEnvInstanceRequest adds mount_points passthrough for
arca; no init_command on envhub/api-service surface.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Environment.presign_url(port, expiration_time_in_minutes=5) mirrors
arca-sandbox SDK signature. Engine differences resolve inside
api-service; SDK never branches on instance.labels or engine type.
- Drop _is_arca_engine / _guard_data_plane helpers and the arca branch
in wait_for_ready. Data-plane methods no longer raise on any engine.
- scheduler_client.presign_url posts to /env-instance/{id}/presign-url.
- Accept mount_points kwarg for forwarding to engines that support it.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- e2e_arca_test.py: aenv SDK lifecycle smoke against api-service-arca (no engine-awareness assertions, SDK is engine-unaware now). - e2e_arca_presign_test.py: end-to-end arca path - initialize sandbox, env.presign_url(port), HTTP GET the presigned URL, release. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- /health under arca mode returns aenv envelope (success/code/data) so SDK _wait_for_healthy can parse it; gateway short-circuits since arca liveness is already governed by control-plane RUNNING status. - arcaEnvelope.Code switched to json.RawMessage: arca's presign endpoint emits an empty string while OpenAPI emits int. - e2e: 502 from arca gateway counts as "routing OK" since it proves presign + token rewrite work even when the sandbox template has no listener on the probed port. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Default ARCA_SERVICE_PORT=18080 and ARCA_SERVICE_PATH=/healthz (the port+path persistent-bash-session/sweagent sandbox templates expose). - Add ARCA_READINESS_TIMEOUT_S (default 45s): poll the presigned URL until 2xx. Arca RUNNING only means the pod is up; the in-sandbox process has a ~3s cold start, during which connect is refused. - Restore strict 2xx pass criterion so we actually prove the link. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Revert .gitignore arca-local entries; the local-only files (deploy yaml, design docs) are still untracked but no longer protected from "git add ." accidents. - Rewrite aenv/e2e_arca_presign_test.py as an example of the data-plane-only arca usage pattern: AEnvSchedulerClient.create -> no ready-wait -> presign_url -> business polls -> delete. Skips Environment.initialize / wait_for_healthy entirely. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Arca sandboxes do not embed the aenv MCP server, so any traffic to api-service:8081 (/mcp, /health, /sse) cannot be honored. Replace the prior /health short-circuit (which faked a healthy envelope so the SDK would proceed) with a uniform 501 + actionable message pointing the caller at presign_url(). The SDK is expected to opt out via Environment(enable_data_plane=False). Drop the now-unused arca reverse-proxy handlers (arcaTargetURL, handleArcaHTTP, handleArcaSSE) and their helper constants. They can be reintroduced from git history if a future arca image embeds the aenv MCP server.
When enable_data_plane=False the SDK does not touch port 8081 at all: no MCP session, no /health probe, and call_tool / list_tools / list_functions / call_reward / check_health / call_function raise. presign_url still works because it is a control-plane API. Required for arca-engine sandboxes whose images do not embed the aenv MCP server. skip_for_healthy keeps its narrow legacy semantics (only short-circuits the /health probe). Default behavior unchanged. The arca presign example is rewritten on top of the public Environment API instead of the internal AEnvSchedulerClient.
Two rounds asserting the SDK<->api-service contract in arca mode: - Round 2: every data-plane Environment method raises EnvironmentError mentioning enable_data_plane=False instead of hitting api-service:8081 - Round 3: api-service:8081 returns HTTP 501 with an actionable message for /health, /mcp, and arbitrary paths Verified against tydd-staging api-service-arca:v7 + arca-real@1.0.0.
…ability
The Redis env storage previously relied on WATCH/MULTI/EXEC for optimistic
locking on Update, and TxPipeline for atomic Delete. Some Redis-compatible
backends (notably Ant Group TBase via Cache Mesh) reject these transactional
primitives.
Use a per-key sync.Mutex inside RedisEnvStorage to serialize the
read-then-write sequence, and replace TxPipeline with sequential DEL+SREM.
Trade-off: the mutex is process-local, so multi-replica deployments lose
wire-level CAS semantics. envhub is currently deployed with replicaCount=1
and Update calls are infrequent, so the trade-off is acceptable. A future
multi-replica deployment should layer a coordination service (etcd / lease)
on top.
Verified against MOSN+TBase locally:
POST /env/ -> 200
GET /env/{n}/{v} -> 200
PUT /env/{n}/{v} -> 200 (was 500: 'unsupported command WATCH')
POST /release -> 200 (was 500)
DELETE-equivalent path -> 200 (was 500: 'unsupported command MULTI')
Top-level imports of agents.tool eagerly pulled openai>=2.x, breaking clients pinned to openai<2 (e.g. langchain-openai 0.3.x). list_openai_tools is the only API that needs openai-agents; move its imports under TYPE_CHECKING + lazy import inside the method, and drop openai-agents from the default dependency set. Users who need list_openai_tools install with: pip install aenvironment[agents]
There was a problem hiding this comment.
Code Review
This pull request introduces support for the 'arca' sandbox engine across the SDK and API service, adding features such as direct sandbox access via presigned URLs, support for mount points, and an 'enable_data_plane' toggle to bypass MCP/health checks. It also refactors the 'openai-agents' dependency into an optional extra. Critical feedback identifies a regression in the Redis storage implementation where distributed optimistic locking was replaced with local mutexes, which breaks safety in multi-replica deployments and compromises atomicity. Additionally, the review points out a memory leak in the new locking map, misleading timestamps in the Arca client, and a breaking change in environment listing that could disrupt periodic maintenance tasks.
| func (s *RedisEnvStorage) Update(ctx context.Context, key string, env *models.Env, resourceVersion int64, labels map[string]string) error { | ||
| redisKey := s.dataKey(key) | ||
| return s.client.Watch(ctx, func(tx *redis.Tx) error { | ||
| payload, err := tx.Get(ctx, redisKey).Bytes() | ||
| if errors.Is(err, redis.Nil) { | ||
| return fmt.Errorf("%w: %s", ErrEnvNotFound, key) | ||
| } | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read env %s: %w", key, err) | ||
| } | ||
| mu := s.keyLock(key) | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| var record redisEnvRecord | ||
| if err := json.Unmarshal(payload, &record); err != nil { | ||
| return fmt.Errorf("failed to unmarshal env %s: %w", key, err) | ||
| } | ||
| redisKey := s.dataKey(key) | ||
|
|
||
| if record.ResourceVersion != resourceVersion { | ||
| return fmt.Errorf("resource version mismatch for %s: expect %d got %d", key, record.ResourceVersion, resourceVersion) | ||
| } | ||
| current, err := s.loadRecord(ctx, key) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if current.ResourceVersion != resourceVersion { | ||
| return fmt.Errorf("resource version mismatch for %s: expect %d got %d", key, current.ResourceVersion, resourceVersion) | ||
| } | ||
|
|
||
| record.Env = env | ||
| if labels != nil { | ||
| record.Labels = copyLabels(labels) | ||
| } | ||
| record.ResourceVersion++ | ||
| record.LastUpdatedEpoch = time.Now().Unix() | ||
| updated := redisEnvRecord{ | ||
| Env: env, | ||
| Labels: current.Labels, | ||
| ResourceVersion: current.ResourceVersion + 1, | ||
| LastUpdatedEpoch: time.Now().Unix(), | ||
| } | ||
| if labels != nil { | ||
| updated.Labels = copyLabels(labels) | ||
| } | ||
|
|
||
| updatedPayload, err := json.Marshal(record) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal updated env %s: %w", key, err) | ||
| } | ||
| payload, err := json.Marshal(updated) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal updated env %s: %w", key, err) | ||
| } | ||
|
|
||
| _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { | ||
| p.Set(ctx, redisKey, updatedPayload, 0) | ||
| p.SAdd(ctx, s.indexKey, key) | ||
| return nil | ||
| }) | ||
| return err | ||
| }, redisKey) | ||
| if err := s.client.Set(ctx, redisKey, payload, 0).Err(); err != nil { | ||
| return fmt.Errorf("failed to write env %s: %w", key, err) | ||
| } | ||
| if err := s.client.SAdd(ctx, s.indexKey, key).Err(); err != nil { | ||
| return fmt.Errorf("failed to update index for env %s: %w", key, err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Delete deletes Env object | ||
| // Delete removes the env record and its index entry. | ||
| func (s *RedisEnvStorage) Delete(ctx context.Context, key string) error { | ||
| redisKey := s.dataKey(key) | ||
| pipe := s.client.TxPipeline() | ||
| pipe.Del(ctx, redisKey) | ||
| pipe.SRem(ctx, s.indexKey, key) | ||
| if _, err := pipe.Exec(ctx); err != nil { | ||
| mu := s.keyLock(key) | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| if err := s.client.Del(ctx, s.dataKey(key)).Err(); err != nil { | ||
| return fmt.Errorf("failed to delete env %s: %w", key, err) | ||
| } | ||
| if err := s.client.SRem(ctx, s.indexKey, key).Err(); err != nil { | ||
| return fmt.Errorf("failed to remove env %s from index: %w", key, err) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The replacement of Redis Watch (optimistic locking) with a local sync.Mutex is a regression for distributed environments. In a multi-replica deployment, local mutexes cannot prevent race conditions between different service instances, which can lead to data corruption or lost updates when multiple requests attempt to modify the same environment record concurrently. Furthermore, the removal of Redis transactions/pipelines in Update and Delete (lines 172-177 and 187-192) breaks atomicity; if a partial failure occurs (e.g., Set succeeds but SAdd fails), the index will become inconsistent with the data.
| keyLocks sync.Map | ||
| } | ||
|
|
||
| func (s *RedisEnvStorage) keyLock(key string) *sync.Mutex { | ||
| v, _ := s.keyLocks.LoadOrStore(key, &sync.Mutex{}) | ||
| return v.(*sync.Mutex) | ||
| } |
There was a problem hiding this comment.
The keyLocks sync.Map implementation used to store mutexes per key is a memory leak. Entries are added for every unique environment key ever accessed but are never removed, causing the map to grow indefinitely over the lifetime of the process. In a system with many unique environment IDs, this will lead to unbounded memory consumption.
| now := time.Now().Format("2006-01-02 15:04:05") | ||
| return &models.EnvInstance{ | ||
| ID: info.SandboxID, | ||
| Status: mapArcaStatus(info.Status), | ||
| CreatedAt: now, | ||
| UpdatedAt: now, |
There was a problem hiding this comment.
Using time.Now() for CreatedAt and UpdatedAt in GetEnvInstance is misleading. Since this method is called to poll for status, it will incorrectly report that the instance was just created/updated at the time of the request. If the Arca API does not provide these timestamps, it is better to leave them empty or retrieve the original creation time from a persistent store where it was recorded during the initial creation.
| // | ||
| // Supported engines: arca (always returns error). | ||
| func (c *ArcaClient) ListEnvInstances(envName string) ([]*models.EnvInstance, error) { | ||
| return nil, fmt.Errorf("arca: ListEnvInstances not supported") |
There was a problem hiding this comment.
Returning an error from ListEnvInstances breaks the Unified periodic task in main.go. When running with the Arca engine, this error causes the cleanup and metrics collection loop to abort every cycle, preventing any maintenance tasks from executing and flooding logs with warnings. Since Arca doesn't support listing, this method should return an empty list and nil error to allow the periodic task to continue gracefully.
| return nil, fmt.Errorf("arca: ListEnvInstances not supported") | |
| return []*models.EnvInstance{}, nil |
| f"/env-instance/{instance_id}/presign-url", json=payload | ||
| ) | ||
| try: | ||
| api_response = APIResponse(**response.json()) |
There was a problem hiding this comment.
The instantiation APIResponse(**response.json()) is fragile. If the response is not a dictionary (e.g., a list or primitive) or if it fails Pydantic validation, it will raise a TypeError or ValidationError, neither of which is caught by the ValueError handler (in Pydantic v2). It is safer to verify the response type and catch broader exceptions to prevent client crashes on unexpected API responses.
No description provided.