Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
272b0b3
feat: Add Logfire error tracking to fleet env
Feb 26, 2026
216d16a
fix(telemetry): consistent schema with env_key, env_version, task_key…
Feb 27, 2026
34dd0d9
feat(telemetry): add fleet_mcp_tool_error for MCP server errors
Feb 27, 2026
03fbb92
fix(telemetry): set context BEFORE Fleet.make() so init failures have…
Feb 27, 2026
9d12c3e
fix: address bugbot issues in PR #6
Feb 27, 2026
0bc9cfd
fix: add hard timeout to MCP operations to prevent hanging
Feb 27, 2026
b061b4e
fix: async Fleet.make() to prevent event loop blocking
Feb 27, 2026
d8c5ddc
fix: enrich fleet_mcp_tool_error with env:version and step info
Feb 27, 2026
e627cd0
fix: suppress noisy logfire console output and tracebacks
Feb 28, 2026
73b81b5
Fix telemetry dashboard: count init failures as rollouts, add total_s…
Feb 28, 2026
1e9bfce
Add fleet_provisioning_completed telemetry event with provisioning_ti…
Feb 28, 2026
f4ed59b
Fix MCP endpoint routing, telemetry gap, and retry config
Mar 3, 2026
327c782
fix: call_tool retry used non-existent retry_base_delay attr
Mar 3, 2026
887fd1f
feat: auto-select instance TTL based on modality
Mar 3, 2026
84de403
Emit fleet_rollout_completed on close() for orphaned rollouts
Mar 4, 2026
77b9d6a
Simplify orphaned rollout stop reasons to max_steps / abandoned
Mar 4, 2026
540530a
Add Fleet telemetry section to README
Mar 4, 2026
199f67f
Increase tool_use TTL from 600s to 900s to reduce 502s from instance …
Mar 4, 2026
33d53c9
fix: use asyncio.to_thread(Fleet.make()) instead of AsyncFleet.make()
Mar 4, 2026
0d37811
fix: wrap sync blocking calls in asyncio.to_thread() to unblock event…
Mar 4, 2026
f86fa49
feat: add trace upload utilities for eval rollouts
Mar 7, 2026
290600e
fix: Convert OpenAI image_url blocks to Fleet ingest format for prope…
Mar 8, 2026
c99c1e5
fix: Pass reward as score to ingest API so sessions complete
Mar 8, 2026
fc0508f
Add hint-based reward for solver RL training (Options B, C, D)
Mar 8, 2026
31fa602
Revert "Add hint-based reward for solver RL training (Options B, C, D)"
Mar 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,25 @@ Supporters include: Meta-PyTorch, Hugging Face, [Patronus AI](https://patronus.a

And we'd also like to acknowledge the team at Farama Foundation as the OpenEnv API was heavily inspired by the work you all have done on Gymnasium. Cheers!

## Fleet Telemetry

`FleetTaskEnv` emits Logfire events to track rollout lifecycle. Every `fleet_rollout_started` gets a matching `fleet_rollout_completed` with a `failure_reason`:

```
started = completed + init_err + tools_err + no_computer + max_steps + abandoned
```

| `failure_reason` | When |
|---|---|
| *(null)* | Rollout completed normally (verifier ran) |
| `init_error` | Fleet provisioning failed |
| `tools_error` | `list_tools()` MCP call failed |
| `computer_tool_missing` | CUA modality but no `computer` tool |
| `max_steps` | Caller hit turn limit without running verifier |
| `abandoned` | Caller stopped early (context overflow, job cancelled, crash) |

Set `LOGFIRE_TOKEN` to enable. Events include `step_count`, `reward`, `verifier_success`, and task context (env_key, version, modality).

## License

BSD 3-Clause License (see [LICENSE](./LICENSE) file)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
fleet = [
"mcp>=1.0.0",
"fleet-python>=0.2.79",
"logfire>=3.0.0",
]

[project.scripts]
Expand Down
221 changes: 191 additions & 30 deletions src/envs/fleet_env/README.md

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions src/envs/fleet_env/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from .mcp_tools import FleetMCPTools
from .models import CallToolAction, ListToolsAction
from .task_env import FleetTaskEnv, make_fleet_task_env
from .telemetry import configure_fleet_telemetry, set_task_context, clear_task_context
from .trace import create_trace_job, upload_trace

__all__ = [
"FleetEnvClient",
Expand All @@ -22,4 +24,9 @@
"ContextManager",
"CONTEXT_TOOLS",
"CONTEXT_TOOL_NAMES",
"configure_fleet_telemetry",
"set_task_context",
"clear_task_context",
"create_trace_job",
"upload_trace",
]
174 changes: 167 additions & 7 deletions src/envs/fleet_env/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from .mcp_tools import FleetMCPTools
from .models import CallToolAction, ListToolsAction
from .telemetry import fleet_error, fleet_warning, fleet_info
Comment thread
cursor[bot] marked this conversation as resolved.


class FleetEnvClient(HTTPEnvClient[Action, Observation]):
Expand Down Expand Up @@ -50,12 +51,12 @@ def from_fleet(
cls: Type["FleetEnvClient"],
api_key: str,
env_key: str,
data_key: str,
data_version: str,
image_type: str,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking API change leaves existing callers without required args

High Severity

from_fleet changed data_key, data_version, and image_type from Optional[str] = None to required positional arguments, but existing callers in test_fleet_env.py (lines 83, 92, 105) and examples/fleet_env_example.py still call it with only api_key and env_key. These raise TypeError at runtime, breaking previously passing tests and the documented example.

Additional Locations (1)

Fix in Cursor Fix in Web

region: Optional[str] = None,
ttl_seconds: Optional[int] = 3600,
env_variables: Optional[Dict[str, Any]] = None,
image_type: Optional[str] = None,
data_key: Optional[str] = None,
data_version: Optional[str] = None,
**kwargs: Any,
) -> Tuple["FleetEnvClient", FleetMCPTools]:
try:
Expand All @@ -80,6 +81,7 @@ def from_fleet(

import time
import logging

_logger = logging.getLogger(__name__)

_logger.info(f"Creating Fleet instance: env_key={env_key}, ttl={ttl_seconds}s")
Expand All @@ -92,12 +94,14 @@ def from_fleet(

for attempt in range(max_retries):
try:
# Fleet SDK expects image_type=None for standard images
sdk_image_type = image_type if image_type == "mcp" else None
env = fleet.make(
env_key=env_key,
region=region,
ttl_seconds=ttl_seconds,
env_variables=env_variables,
image_type=image_type,
image_type=sdk_image_type,
data_key=data_key_spec,
)
break # Success
Expand All @@ -114,18 +118,166 @@ def from_fleet(
f"[env={env_key}] Fleet.make() failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {delay:.1f}s..."
)
fleet_warning(
"fleet_make_retry",
attempt=attempt + 1,
max_retries=max_retries,
error_type=type(e).__name__,
error_message=str(e),
retry_delay_s=delay,
)
time.sleep(delay)
else:
_logger.error(
f"[env={env_key}] Fleet.make() failed after {attempt + 1} attempt(s): {e}"
)
fleet_error(
"fleet_make_failed",
attempt=attempt + 1,
max_retries=max_retries,
error_type=type(e).__name__,
error_message=str(e),
)
raise

_logger.info(f"Fleet instance ready in {time.time() - start:.1f}s: {env.instance_id}")
elapsed = time.time() - start
instance_id = getattr(env, "instance_id", "unknown")
_logger.info(f"Fleet instance ready in {elapsed:.1f}s: {instance_id}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync from_fleet missing fleet_provisioning_completed telemetry event

Low Severity

The async from_fleet_async() emits a fleet_provisioning_completed telemetry event with provisioning_time_s and instance_id, but the sync from_fleet() does not. This means provisioning latency data is silently missing for any code path using the sync method (including the example at examples/fleet_env_example.py), breaking the Logfire SQL queries documented in the README.

Additional Locations (1)

Fix in Cursor Fix in Web


root = env.urls.root
# Fleet currently exposes multiple MCP endpoints. Prefer /api/v1/mcp first.
mcp_urls = (f"{root}api/v1/mcp", f"{root}mcp")
# Pick MCP endpoint based on modality:
# - computer_use: aggregator on port 8081 (has computer tool + API tools)
# - tool_use: per-env MCP server on port 3003 (API tools only)
if image_type == "mcp":
mcp_urls = (f"{root}api/v1/mcp",)
else:
mcp_urls = (f"{root}mcp",)

orch = cls(
base_url=env.urls.manager.api,
fleet_env_handle=env,
api_key=api_key,
mcp_urls=mcp_urls,
**kwargs,
)
tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
return orch, tools

@classmethod
async def from_fleet_async(
cls: Type["FleetEnvClient"],
api_key: str,
env_key: str,
data_key: str,
data_version: str,
image_type: str,
region: Optional[str] = None,
ttl_seconds: Optional[int] = 3600,
env_variables: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Tuple["FleetEnvClient", FleetMCPTools]:
"""Async version of from_fleet() — does not block the event loop.

Uses AsyncFleet.make() for provisioning and asyncio.sleep() for retries,
allowing other async trajectories to progress while waiting.
"""
try:
from fleet._async import AsyncFleet
except ImportError as e:
raise ImportError(
"Fleet support requires the optional dependency set. "
"Install with `pip install openenv[fleet]`."
) from e

async_fleet = AsyncFleet(api_key=api_key)

# Fleet SDK expects data_key in "key:version" format
data_key_spec = None
if data_key:
if data_version:
data_key_spec = f"{data_key}:{data_version}"
else:
data_key_spec = data_key

import time
import logging

_logger = logging.getLogger(__name__)

_logger.info(f"Creating Fleet instance (async): env_key={env_key}, ttl={ttl_seconds}s")
start = time.time()

# Retry logic with async sleep (non-blocking)
max_retries = 3
retry_base_delay = 2.0 # seconds
env = None

# Fleet SDK expects image_type=None for standard images
sdk_image_type = image_type if image_type == "mcp" else None

for attempt in range(max_retries):
try:
env = await async_fleet.make(
env_key=env_key,
region=region,
ttl_seconds=ttl_seconds,
env_variables=env_variables,
image_type=sdk_image_type,
data_key=data_key_spec,
)
break # Success
except Exception as e:
error_msg = str(e)
# Retry on transient errors (health check failures, timeouts, etc.)
is_transient = any(
x in error_msg.lower()
for x in ["health check", "timeout", "connection", "temporarily"]
)
if attempt < max_retries - 1 and is_transient:
delay = retry_base_delay * (2**attempt)
_logger.warning(
f"[env={env_key}] AsyncFleet.make() failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {delay:.1f}s..."
)
fleet_warning(
"fleet_make_retry",
attempt=attempt + 1,
max_retries=max_retries,
error_type=type(e).__name__,
error_message=str(e),
retry_delay_s=delay,
)
await asyncio.sleep(delay)
else:
_logger.error(
f"[env={env_key}] AsyncFleet.make() failed after {attempt + 1} attempt(s): {e}"
)
fleet_error(
"fleet_make_failed",
attempt=attempt + 1,
max_retries=max_retries,
error_type=type(e).__name__,
error_message=str(e),
)
raise

elapsed = time.time() - start
instance_id = getattr(env, "instance_id", "unknown")
_logger.info(f"Fleet instance ready (async) in {elapsed:.1f}s: {instance_id}")
fleet_info(
"fleet_provisioning_completed",
provisioning_time_s=round(elapsed, 1),
instance_id=instance_id,
)

root = env.urls.root
# Pick MCP endpoint based on modality:
# - computer_use (image_type="mcp"): aggregator on port 8081 (has computer tool + API tools)
# - tool_use: per-env MCP server on port 3003 (API tools only)
if image_type == "mcp":
mcp_urls = (f"{root}api/v1/mcp",)
else:
mcp_urls = (f"{root}mcp",)

orch = cls(
base_url=env.urls.manager.api,
Expand Down Expand Up @@ -185,4 +337,12 @@ def close(self) -> None:
self._fleet_env.close()
super().close()

async def close_async(self) -> None:
"""Async close — runs sync Fleet close in a thread to avoid blocking the event loop."""
if self._fleet_env:
await asyncio.to_thread(self._fleet_env.close)
super().close()

async def reset_async(self) -> "StepResult":
"""Async reset — runs sync HTTP reset in a thread to avoid blocking the event loop."""
return await asyncio.to_thread(self.reset)
43 changes: 37 additions & 6 deletions src/envs/fleet_env/fleet_mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,47 @@


class FleetMCPClient:
# Hard timeout for entire MCP operation (connection + request)
OPERATION_TIMEOUT_S = 60

def __init__(self, url: str, api_key: str):
self.url = url
self.api_key = api_key

async def list_tools(self) -> List[Tool]:
async def _list_tools_impl(self) -> List[Tool]:
"""Internal implementation without timeout wrapper."""
async with streamablehttp_client(
url=self.url,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=timedelta(seconds=120),
sse_read_timeout=timedelta(seconds=300),
timeout=timedelta(seconds=30),
sse_read_timeout=timedelta(seconds=60),
) as streams:
async with ClientSession(
read_stream=streams[0], write_stream=streams[1]
) as session:
await session.initialize()
return (await session.list_tools()).tools

async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
async def list_tools(self) -> List[Tool]:
"""List tools with hard timeout to prevent hanging."""
import asyncio

try:
return await asyncio.wait_for(
self._list_tools_impl(), timeout=self.OPERATION_TIMEOUT_S
)
except asyncio.TimeoutError:
raise TimeoutError(
f"list_tools timed out after {self.OPERATION_TIMEOUT_S}s for {self.url}"
)

async def _call_tool_impl(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Internal implementation without timeout wrapper."""
async with streamablehttp_client(
url=self.url,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=timedelta(seconds=120),
sse_read_timeout=timedelta(seconds=300),
timeout=timedelta(seconds=30),
sse_read_timeout=timedelta(seconds=60),
) as streams:
async with ClientSession(
read_stream=streams[0], write_stream=streams[1]
Expand All @@ -60,6 +78,19 @@ async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
result = await session.call_tool(name, arguments)
return self._extract_tool_result(result)

async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Call tool with hard timeout to prevent hanging."""
import asyncio

try:
return await asyncio.wait_for(
self._call_tool_impl(name, arguments), timeout=self.OPERATION_TIMEOUT_S
)
except asyncio.TimeoutError:
raise TimeoutError(
f"call_tool({name}) timed out after {self.OPERATION_TIMEOUT_S}s for {self.url}"
)

def _extract_tool_result(self, result: Any) -> Any:
"""Extract readable content from CallToolResult.

Expand Down
Loading